• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 242

04 Jun 2025 08:45PM UTC coverage: 36.779% (-19.8%) from 56.614%
242

push

buildkite

web-flow
Merge pull request #17086 from MinaProtocol/dkijania/port_terraform_removal

port terraform removal to develop

26262 of 71405 relevant lines covered (36.78%)

34615.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.83
/src/lib/bootstrap_controller/bootstrap_controller.ml
1
(* Only show stdout for failed inline tests. *)
8✔
2
open Core
3
open Async
4
open Mina_base
5
module Ledger = Mina_ledger.Ledger
6
module Sync_ledger = Mina_ledger.Sync_ledger
7
open Mina_state
8
open Pipe_lib.Strict_pipe
9
open Network_peer
10
module Transition_cache = Transition_cache
11

12
module type CONTEXT = sig
13
  val logger : Logger.t
14

15
  val precomputed_values : Precomputed_values.t
16

17
  val constraint_constants : Genesis_constants.Constraint_constants.t
18

19
  val consensus_constants : Consensus.Constants.t
20

21
  val ledger_sync_config : Syncable_ledger.daemon_config
22

23
  val proof_cache_db : Proof_cache_tag.cache_db
24
end
25

26
type Structured_log_events.t += Bootstrap_complete
27
  [@@deriving register_event { msg = "Bootstrap state: complete." }]
3✔
28

29
type t =
30
  { context : (module CONTEXT)
31
  ; trust_system : Trust_system.t
32
  ; verifier : Verifier.t
33
  ; mutable best_seen_transition : Mina_block.initial_valid_block
34
  ; mutable current_root : Mina_block.initial_valid_block
35
  ; network : Mina_networking.t
36
  ; mutable num_of_root_snarked_ledger_retargeted : int
37
  }
38

39
type time = Time.Span.t
40

41
let time_to_yojson span =
42
  `String (Printf.sprintf "%f seconds" (Time.Span.to_sec span))
×
43

44
type opt_time = time option
45

46
let opt_time_to_yojson = function
47
  | Some time ->
×
48
      time_to_yojson time
49
  | None ->
×
50
      `Null
51

52
(** An auxiliary data structure for collecting various metrics for bootstrap controller. *)
53
type bootstrap_cycle_stats =
×
54
  { cycle_result : string
×
55
  ; sync_ledger_time : time
×
56
  ; staged_ledger_data_download_time : time
×
57
  ; staged_ledger_construction_time : opt_time
×
58
  ; local_state_sync_required : bool
×
59
  ; local_state_sync_time : opt_time
×
60
  }
61
[@@deriving to_yojson]
62

63
let time_deferred deferred =
64
  let start_time = Time.now () in
×
65
  let%map result = deferred in
66
  let end_time = Time.now () in
×
67
  (Time.diff end_time start_time, result)
×
68

69
let worth_getting_root ({ context = (module Context); _ } as t) candidate =
70
  let module Consensus_context = struct
×
71
    include Context
72

73
    let logger =
74
      Logger.extend logger
×
75
        [ ( "selection_context"
76
          , `String "Bootstrap_controller.worth_getting_root" )
77
        ]
78
  end in
79
  Consensus.Hooks.equal_select_status `Take
80
  @@ Consensus.Hooks.select
81
       ~context:(module Consensus_context)
82
       ~existing:
83
         ( t.best_seen_transition |> Mina_block.Validation.block_with_hash
×
84
         |> With_hash.map ~f:Mina_block.consensus_state )
×
85
       ~candidate
86

87
let received_bad_proof ({ context = (module Context); _ } as t) host e =
88
  let open Context in
×
89
  Trust_system.(
90
    record t.trust_system logger host
91
      Actions.
92
        ( Violated_protocol
93
        , Some
94
            ( "Bad ancestor proof: $error"
95
            , [ ("error", Error_json.error_to_yojson e) ] ) ))
×
96

97
let done_syncing_root root_sync_ledger =
98
  Option.is_some (Sync_ledger.Db.peek_valid_tree root_sync_ledger)
×
99

100
let should_sync ~root_sync_ledger t candidate_state =
101
  (not @@ done_syncing_root root_sync_ledger)
×
102
  && worth_getting_root t candidate_state
×
103

104
(** Update [Synced_ledger]'s target and [best_seen_transition] and [current_root] accordingly. *)
105
let start_sync_job_with_peer ~sender ~root_sync_ledger
106
    ({ context = (module Context); _ } as t) peer_best_tip peer_root =
107
  let open Context in
×
108
  let%bind () =
109
    Trust_system.(
110
      record t.trust_system logger sender
×
111
        Actions.
112
          ( Fulfilled_request
113
          , Some ("Received verified peer root and best tip", []) ))
114
  in
115
  t.best_seen_transition <- peer_best_tip ;
×
116
  t.current_root <- peer_root ;
117
  let blockchain_state =
118
    t.current_root |> Mina_block.Validation.block |> Mina_block.header
×
119
    |> Mina_block.Header.protocol_state |> Protocol_state.blockchain_state
×
120
  in
121
  let expected_staged_ledger_hash =
×
122
    blockchain_state |> Blockchain_state.staged_ledger_hash
123
  in
124
  let snarked_ledger_hash =
×
125
    blockchain_state |> Blockchain_state.snarked_ledger_hash
126
  in
127
  return
×
128
  @@
129
  match
130
    Sync_ledger.Db.new_goal root_sync_ledger
131
      (Frozen_ledger_hash.to_ledger_hash snarked_ledger_hash)
×
132
      ~data:
133
        ( State_hash.With_state_hashes.state_hash
×
134
          @@ Mina_block.Validation.block_with_hash t.current_root
×
135
        , sender
136
        , expected_staged_ledger_hash )
137
      ~equal:(fun (hash1, _, _) (hash2, _, _) -> State_hash.equal hash1 hash2)
×
138
  with
139
  | `New ->
×
140
      t.num_of_root_snarked_ledger_retargeted <-
141
        t.num_of_root_snarked_ledger_retargeted + 1 ;
142
      `Syncing_new_snarked_ledger
143
  | `Update_data ->
×
144
      `Updating_root_transition
145
  | `Repeat ->
×
146
      `Ignored
147

148
let to_consensus_state h =
149
  Mina_block.Header.protocol_state h |> Protocol_state.consensus_state
×
150

151
(** For each transition, this function would compare it with the existing one.
152
    If the incoming transition is better, then download the merkle list from
153
    that transition to its root and verify it. If we get a better root than
154
    the existing one, then reset the Sync_ledger's target by calling
155
    [start_sync_job_with_peer] function. *)
156
let on_transition ({ context = (module Context); _ } as t) ~sender
157
    ~root_sync_ledger candidate_header =
158
  let open Context in
×
159
  let candidate_consensus_state =
160
    With_hash.map ~f:to_consensus_state candidate_header
161
  in
162
  if not @@ should_sync ~root_sync_ledger t candidate_consensus_state then
×
163
    Deferred.return `Ignored
×
164
  else
165
    match%bind
166
      Mina_networking.get_ancestry t.network sender.Peer.peer_id
×
167
        (With_hash.map_hash candidate_consensus_state
×
168
           ~f:State_hash.State_hashes.state_hash )
169
    with
170
    | Error e ->
×
171
        [%log error]
×
172
          ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
173
          !"Could not get the proof of the root transition from the network: \
174
            $error" ;
175
        Deferred.return `Ignored
×
176
    | Ok peer_root_with_proof -> (
×
177
        let pcd =
178
          peer_root_with_proof.data
179
          |> Proof_carrying_data.map
×
180
               ~f:(Mina_block.write_all_proofs_to_disk ~proof_cache_db)
181
          |> Proof_carrying_data.map_proof
182
               ~f:
183
                 (Tuple2.map_snd
184
                    ~f:(Mina_block.write_all_proofs_to_disk ~proof_cache_db) )
185
        in
186
        match%bind
187
          Mina_block.verify_on_header
×
188
            ~verify:
189
              (Sync_handler.Root.verify
×
190
                 ~context:(module Context)
191
                 ~verifier:t.verifier candidate_consensus_state )
192
            pcd
193
        with
194
        | Ok (`Root root, `Best_tip best_tip) ->
×
195
            if done_syncing_root root_sync_ledger then return `Ignored
×
196
            else
197
              start_sync_job_with_peer ~sender ~root_sync_ledger t best_tip root
×
198
        | Error e ->
×
199
            return (received_bad_proof t sender e |> Fn.const `Ignored) )
×
200

201
(** A helper function that wraps the calls to Sync_ledger and iterate through
202
    incoming transitions, add those to the transition_cache and calls
203
    [on_transition] function. *)
204
let sync_ledger ({ context = (module Context); _ } as t) ~preferred
205
    ~root_sync_ledger ~transition_graph ~sync_ledger_reader =
206
  let open Context in
×
207
  let query_reader = Sync_ledger.Db.query_reader root_sync_ledger in
208
  let response_writer = Sync_ledger.Db.answer_writer root_sync_ledger in
×
209
  Mina_networking.glue_sync_ledger ~preferred t.network query_reader
×
210
    response_writer ;
211
  Reader.iter sync_ledger_reader ~f:(fun (b_or_h, `Valid_cb vc) ->
×
212
      let header_with_hash, sender, transition_cache_element =
×
213
        match b_or_h with
214
        | `Block b_env ->
×
215
            ( Envelope.Incoming.data b_env
×
216
              |> Mina_block.Validation.block_with_hash
×
217
              |> With_hash.map ~f:Mina_block.header
×
218
            , Envelope.Incoming.remote_sender_exn b_env
×
219
            , Envelope.Incoming.map ~f:(fun x -> `Block x) b_env )
×
220
        | `Header h_env ->
×
221
            ( Envelope.Incoming.data h_env
×
222
              |> Mina_block.Validation.header_with_hash
×
223
            , Envelope.Incoming.remote_sender_exn h_env
×
224
            , Envelope.Incoming.map ~f:(fun x -> `Header x) h_env )
×
225
      in
226
      let previous_state_hash =
227
        With_hash.data header_with_hash
×
228
        |> Mina_block.Header.protocol_state
×
229
        |> Protocol_state.previous_state_hash
230
      in
231
      Transition_cache.add transition_graph ~parent:previous_state_hash
×
232
        (transition_cache_element, vc) ;
233
      (* TODO: Efficiently limiting the number of green threads in #1337 *)
234
      if
×
235
        worth_getting_root t
236
          (With_hash.map ~f:to_consensus_state header_with_hash)
×
237
      then (
×
238
        [%log trace] "Added the transition from sync_ledger_reader into cache"
×
239
          ~metadata:
240
            [ ( "state_hash"
241
              , State_hash.to_yojson
×
242
                  (State_hash.With_state_hashes.state_hash header_with_hash) )
×
243
            ; ( "header"
244
              , Mina_block.Header.to_yojson (With_hash.data header_with_hash) )
×
245
            ] ;
246

247
        Deferred.ignore_m
×
248
        @@ on_transition t ~sender ~root_sync_ledger header_with_hash )
×
249
      else Deferred.unit )
×
250

251
let external_transition_compare ~context:(module Context : CONTEXT) =
252
  let get_consensus_state =
×
253
    Fn.compose Protocol_state.consensus_state Mina_block.Header.protocol_state
254
  in
255
  Comparable.lift
×
256
    (fun existing candidate ->
257
      (* To prevent the logger to spam a lot of messages, the logger input is set to null *)
258
      if
×
259
        State_hash.equal
260
          (State_hash.With_state_hashes.state_hash existing)
×
261
          (State_hash.With_state_hashes.state_hash candidate)
×
262
      then 0
×
263
      else if
×
264
        Consensus.Hooks.equal_select_status `Keep
265
        @@ Consensus.Hooks.select ~context:(module Context) ~existing ~candidate
266
      then -1
×
267
      else 1 )
×
268
    ~f:(With_hash.map ~f:get_consensus_state)
269

270
let download_snarked_ledger ~trust_system ~preferred_peers ~transition_graph
271
    ~sync_ledger_reader ~context t temp_snarked_ledger =
272
  time_deferred
×
273
    (let root_sync_ledger =
274
       Sync_ledger.Db.create temp_snarked_ledger ~context ~trust_system
275
     in
276
     don't_wait_for
×
277
       (sync_ledger t ~preferred:preferred_peers ~root_sync_ledger
×
278
          ~transition_graph ~sync_ledger_reader ) ;
279
     (* We ignore the resulting ledger returned here since it will always
280
        * be the same as the ledger we started with because we are syncing
281
        * a db ledger. *)
282
     let%map _, data = Sync_ledger.Db.valid_tree root_sync_ledger in
×
283
     Sync_ledger.Db.destroy root_sync_ledger ;
×
284
     data )
×
285

286
(** Run one bootstrap cycle *)
287
let run_cycle ~context:(module Context : CONTEXT) ~trust_system ~verifier
288
    ~network ~consensus_local_state ~transition_reader ~preferred_peers
289
    ~persistent_root ~persistent_frontier ~initial_root_transition ~catchup_mode
290
    previous_cycles =
291
  let open Context in
×
292
  let sync_ledger_reader, sync_ledger_writer =
293
    create ~name:"sync ledger pipe" Synchronous
294
  in
295
  don't_wait_for
×
296
    (transfer_while_writer_alive transition_reader sync_ledger_writer ~f:Fn.id) ;
×
297
  let initial_root_transition =
×
298
    initial_root_transition |> Mina_block.Validated.remember
×
299
    |> Mina_block.Validation.reset_frontier_dependencies_validation
×
300
    |> Mina_block.Validation.reset_staged_ledger_diff_validation
301
  in
302
  let t =
×
303
    { network
304
    ; context = (module Context)
305
    ; trust_system
306
    ; verifier
307
    ; best_seen_transition = initial_root_transition
308
    ; current_root = initial_root_transition
309
    ; num_of_root_snarked_ledger_retargeted = 0
310
    }
311
  in
312
  let transition_graph = Transition_cache.create () in
313
  let temp_persistent_root_instance =
×
314
    Transition_frontier.Persistent_root.create_instance_exn persistent_root
315
  in
316
  let temp_snarked_ledger =
×
317
    Transition_frontier.Persistent_root.Instance.snarked_ledger
318
      temp_persistent_root_instance
319
  in
320
  (* step 1. download snarked_ledger *)
321
  let%bind sync_ledger_time, (hash, sender, expected_staged_ledger_hash) =
322
    download_snarked_ledger
×
323
      ~context:(module Context)
324
      ~trust_system ~preferred_peers ~transition_graph ~sync_ledger_reader t
325
      temp_snarked_ledger
326
  in
327
  Mina_metrics.(
×
328
    Counter.inc Bootstrap.root_snarked_ledger_sync_ms
×
329
      Time.Span.(to_ms sync_ledger_time)) ;
×
330
  Mina_metrics.(
331
    Gauge.set Bootstrap.num_of_root_snarked_ledger_retargeted
×
332
      (Float.of_int t.num_of_root_snarked_ledger_retargeted)) ;
×
333
  (* step 2. Download scan state and pending coinbases. *)
334
  let%bind ( staged_ledger_data_download_time
335
           , staged_ledger_construction_time
336
           , staged_ledger_aux_result ) =
337
    let%bind ( staged_ledger_data_download_time
338
             , staged_ledger_data_download_result ) =
339
      time_deferred
×
340
        (Mina_networking.get_staged_ledger_aux_and_pending_coinbases_at_hash
×
341
           t.network sender.peer_id hash )
342
    in
343
    match staged_ledger_data_download_result with
×
344
    | Error err ->
×
345
        Deferred.return (staged_ledger_data_download_time, None, Error err)
346
    | Ok
×
347
        ( scan_state_uncached
348
        , expected_merkle_root
349
        , pending_coinbases
350
        , protocol_states ) -> (
351
        let%map staged_ledger_construction_result =
352
          O1trace.thread "construct_root_staged_ledger" (fun () ->
×
353
              let open Deferred.Or_error.Let_syntax in
×
354
              let received_staged_ledger_hash =
355
                Staged_ledger_hash.of_aux_ledger_and_coinbase_hash
356
                  (Staged_ledger.Scan_state.Stable.Latest.hash
×
357
                     scan_state_uncached )
358
                  expected_merkle_root pending_coinbases
359
              in
360
              [%log debug]
×
361
                ~metadata:
362
                  [ ( "expected_staged_ledger_hash"
363
                    , Staged_ledger_hash.to_yojson expected_staged_ledger_hash
×
364
                    )
365
                  ; ( "received_staged_ledger_hash"
366
                    , Staged_ledger_hash.to_yojson received_staged_ledger_hash
×
367
                    )
368
                  ]
369
                "Comparing $expected_staged_ledger_hash to \
370
                 $received_staged_ledger_hash" ;
371
              let%bind new_root =
372
                t.current_root
373
                |> Mina_block.Validation.skip_frontier_dependencies_validation
×
374
                     `This_block_belongs_to_a_detached_subtree
375
                |> Mina_block.Validation.validate_staged_ledger_hash
×
376
                     (`Staged_ledger_already_materialized
377
                       received_staged_ledger_hash )
378
                |> Result.map_error ~f:(fun _ ->
×
379
                       Error.of_string "received faulty scan state from peer" )
×
380
                |> Deferred.return
×
381
              in
382
              let protocol_states =
×
383
                List.map protocol_states
384
                  ~f:(With_hash.of_data ~hash_data:Protocol_state.hashes)
385
              in
386
              let scan_state =
×
387
                Staged_ledger.Scan_state.write_all_proofs_to_disk
388
                  ~proof_cache_db scan_state_uncached
389
              in
390
              let%bind protocol_states =
391
                Staged_ledger.Scan_state.check_required_protocol_states
×
392
                  scan_state ~protocol_states
393
                |> Deferred.return
×
394
              in
395
              let protocol_states_map =
×
396
                protocol_states
397
                |> List.map ~f:(fun ps ->
×
398
                       (State_hash.With_state_hashes.state_hash ps, ps) )
×
399
                |> State_hash.Map.of_alist_exn
400
              in
401
              let get_state hash =
×
402
                match Map.find protocol_states_map hash with
×
403
                | None ->
×
404
                    let new_state_hash =
405
                      State_hash.With_state_hashes.state_hash (fst new_root)
×
406
                    in
407
                    [%log error]
×
408
                      ~metadata:
409
                        [ ("new_root", State_hash.to_yojson new_state_hash)
×
410
                        ; ("state_hash", State_hash.to_yojson hash)
×
411
                        ]
412
                      "Protocol state (for scan state transactions) for \
413
                       $state_hash not found when bootstrapping to the new \
414
                       root $new_root" ;
415
                    Or_error.errorf
×
416
                      !"Protocol state (for scan state transactions) for \
×
417
                        %{sexp:State_hash.t} not found when bootstrapping to \
418
                        the new root %{sexp:State_hash.t}"
419
                      hash new_state_hash
420
                | Some protocol_state ->
×
421
                    Ok (With_hash.data protocol_state)
×
422
              in
423
              (* step 3. Construct staged ledger from snarked ledger, scan state
424
                 and pending coinbases. *)
425
              (* Construct the staged ledger before constructing the transition
426
               * frontier in order to verify the scan state we received.
427
               * TODO: reorganize the code to avoid doing this twice (#3480) *)
428
              let open Deferred.Let_syntax in
429
              let%map staged_ledger_construction_time, construction_result =
430
                time_deferred
×
431
                  (let open Deferred.Let_syntax in
432
                  let temp_mask = Ledger.of_database temp_snarked_ledger in
433
                  let%map result =
434
                    Staged_ledger
435
                    .of_scan_state_pending_coinbases_and_snarked_ledger ~logger
436
                      ~snarked_local_state:
437
                        Mina_block.(
438
                          t.current_root |> Validation.block |> header
×
439
                          |> Header.protocol_state
×
440
                          |> Protocol_state.blockchain_state
×
441
                          |> Blockchain_state.snarked_local_state)
×
442
                      ~verifier ~constraint_constants ~scan_state
443
                      ~snarked_ledger:temp_mask ~expected_merkle_root
444
                      ~pending_coinbases ~get_state
445
                  in
446
                  ignore
×
447
                    ( Ledger.Maskable.unregister_mask_exn ~loc:__LOC__ temp_mask
×
448
                      : Ledger.unattached_mask ) ;
449
                  Result.map result
450
                    ~f:
451
                      (const
×
452
                         ( scan_state
453
                         , pending_coinbases
454
                         , new_root
455
                         , protocol_states ) ))
456
              in
457
              Ok (staged_ledger_construction_time, construction_result) )
×
458
        in
459
        match staged_ledger_construction_result with
×
460
        | Error err ->
×
461
            (staged_ledger_data_download_time, None, Error err)
462
        | Ok (staged_ledger_construction_time, result) ->
×
463
            ( staged_ledger_data_download_time
464
            , Some staged_ledger_construction_time
465
            , result ) )
466
  in
467
  Transition_frontier.Persistent_root.Instance.close
×
468
    temp_persistent_root_instance ;
469
  match staged_ledger_aux_result with
×
470
  | Error e ->
×
471
      let%map () =
472
        Trust_system.(
473
          record t.trust_system logger sender
×
474
            Actions.
475
              ( Outgoing_connection_error
476
              , Some
477
                  ( "Can't find scan state from the peer or received faulty \
478
                     scan state from the peer."
479
                  , [] ) ))
480
      in
481
      [%log error]
×
482
        ~metadata:
483
          [ ("error", Error_json.error_to_yojson e)
×
484
          ; ("state_hash", State_hash.to_yojson hash)
×
485
          ; ( "expected_staged_ledger_hash"
486
            , Staged_ledger_hash.to_yojson expected_staged_ledger_hash )
×
487
          ]
488
        "Failed to find scan state for the transition with hash $state_hash \
489
         from the peer or received faulty scan state: $error. Retry bootstrap" ;
490
      Writer.close sync_ledger_writer ;
×
491
      let this_cycle =
×
492
        { cycle_result = "failed to download and construct scan state"
493
        ; sync_ledger_time
494
        ; staged_ledger_data_download_time
495
        ; staged_ledger_construction_time
496
        ; local_state_sync_required = false
497
        ; local_state_sync_time = None
498
        }
499
      in
500
      `Repeat (this_cycle :: previous_cycles)
501
  | Ok (scan_state, pending_coinbase, new_root, protocol_states) -> (
×
502
      let%bind () =
503
        Trust_system.(
504
          record t.trust_system logger sender
×
505
            Actions.
506
              ( Fulfilled_request
507
              , Some ("Received valid scan state from peer", []) ))
508
      in
509
      let best_seen_block_with_hash, _ = t.best_seen_transition in
×
510
      let consensus_state =
511
        With_hash.data best_seen_block_with_hash
×
512
        |> Mina_block.header |> Mina_block.Header.protocol_state
×
513
        |> Protocol_state.consensus_state
514
      in
515
      (* step 4. Synchronize consensus local state if necessary *)
516
      let%bind ( local_state_sync_time
517
               , (local_state_sync_required, local_state_sync_result) ) =
518
        time_deferred
×
519
          ( match
520
              Consensus.Hooks.required_local_state_sync
521
                ~constants:precomputed_values.consensus_constants
522
                ~consensus_state ~local_state:consensus_local_state
523
            with
524
          | None ->
×
525
              [%log debug]
×
526
                ~metadata:
527
                  [ ( "local_state"
528
                    , Consensus.Data.Local_state.to_yojson consensus_local_state
×
529
                    )
530
                  ; ( "consensus_state"
531
                    , Consensus.Data.Consensus_state.Value.to_yojson
×
532
                        consensus_state )
533
                  ]
534
                "Not synchronizing consensus local state" ;
535
              Deferred.return (false, Or_error.return ())
×
536
          | Some sync_jobs ->
×
537
              [%log info] "Synchronizing consensus local state" ;
×
538
              let%map result =
539
                Consensus.Hooks.sync_local_state
×
540
                  ~context:(module Context)
541
                  ~local_state:consensus_local_state ~trust_system
542
                  ~glue_sync_ledger:(Mina_networking.glue_sync_ledger t.network)
×
543
                  sync_jobs
544
              in
545
              (true, result) )
×
546
      in
547
      match local_state_sync_result with
×
548
      | Error e ->
×
549
          [%log error]
×
550
            ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
551
            "Local state sync failed: $error. Retry bootstrap" ;
552
          Writer.close sync_ledger_writer ;
×
553
          let this_cycle =
×
554
            { cycle_result = "failed to synchronize local state"
555
            ; sync_ledger_time
556
            ; staged_ledger_data_download_time
557
            ; staged_ledger_construction_time
558
            ; local_state_sync_required
559
            ; local_state_sync_time = Some local_state_sync_time
560
            }
561
          in
562
          Deferred.return (`Repeat (this_cycle :: previous_cycles))
563
      | Ok () ->
×
564
          (* step 5. Close the old frontier and reload a new one from disk. *)
565
          let new_root_data : Transition_frontier.Root_data.Limited.t =
566
            Transition_frontier.Root_data.Limited.create
567
              ~transition:(Mina_block.Validated.lift new_root)
×
568
              ~scan_state ~pending_coinbase ~protocol_states
569
          in
570
          let%bind () =
571
            Transition_frontier.Persistent_frontier.reset_database_exn
×
572
              persistent_frontier ~root_data:new_root_data
573
              ~genesis_state_hash:
574
                (State_hash.With_state_hashes.state_hash
×
575
                   precomputed_values.protocol_state_with_hashes )
576
          in
577
          (* TODO: lazy load db in persistent root to avoid unnecessary opens like this *)
578
          Transition_frontier.Persistent_root.(
×
579
            with_instance_exn persistent_root ~f:(fun instance ->
×
580
                Instance.set_root_state_hash instance
×
581
                @@ Mina_block.Validated.state_hash
×
582
                @@ Mina_block.Validated.lift new_root )) ;
×
583
          let%map new_frontier =
584
            let fail msg =
585
              failwith
×
586
                ( "failed to initialize transition frontier after \
587
                   bootstrapping: " ^ msg )
588
            in
589
            Transition_frontier.load
×
590
              ~context:(module Context)
591
              ~retry_with_fresh_db:false ~verifier ~consensus_local_state
592
              ~persistent_root ~persistent_frontier ~catchup_mode ()
593
            >>| function
×
594
            | Ok frontier ->
×
595
                frontier
596
            | Error (`Failure msg) ->
×
597
                fail msg
598
            | Error `Bootstrap_required ->
×
599
                fail
600
                  "bootstrap still required (indicates logical error in code)"
601
            | Error `Persistent_frontier_malformed ->
×
602
                fail "persistent frontier was malformed"
603
            | Error `Snarked_ledger_mismatch ->
×
604
                fail
605
                  "this should not happen, because we just reset the \
606
                   snarked_ledger"
607
          in
608
          [%str_log info] Bootstrap_complete ;
×
609
          let collected_transitions = Transition_cache.data transition_graph in
×
610
          let logger =
×
611
            Logger.extend logger
612
              [ ("context", `String "Filter collected transitions in bootstrap")
613
              ]
614
          in
615
          let root_consensus_state =
×
616
            Transition_frontier.(
617
              Breadcrumb.consensus_state_with_hashes (root new_frontier))
×
618
          in
619
          let filtered_collected_transitions =
620
            List.filter collected_transitions
621
              ~f:(fun (incoming_transition, _) ->
622
                let transition =
×
623
                  Envelope.Incoming.data incoming_transition
×
624
                  |> Transition_cache.header_with_hash
625
                in
626
                Consensus.Hooks.equal_select_status `Take
×
627
                @@ Consensus.Hooks.select
628
                     ~context:(module Context)
629
                     ~existing:root_consensus_state
630
                     ~candidate:
631
                       (With_hash.map
×
632
                          ~f:
633
                            (Fn.compose Protocol_state.consensus_state
×
634
                               Mina_block.Header.protocol_state )
635
                          transition ) )
636
          in
637
          [%log debug] "Sorting filtered transitions by consensus state"
×
638
            ~metadata:[] ;
639
          let sorted_filtered_collected_transitions =
×
640
            O1trace.sync_thread "sorting_collected_transitions" (fun () ->
641
                List.sort filtered_collected_transitions
×
642
                  ~compare:
643
                    (Comparable.lift
×
644
                       ~f:(fun (x, _) ->
645
                         Transition_cache.header_with_hash
×
646
                         @@ Envelope.Incoming.data x )
×
647
                       (external_transition_compare ~context:(module Context)) ) )
648
          in
649
          let this_cycle =
×
650
            { cycle_result = "success"
651
            ; sync_ledger_time
652
            ; staged_ledger_data_download_time
653
            ; staged_ledger_construction_time
654
            ; local_state_sync_required
655
            ; local_state_sync_time = Some local_state_sync_time
656
            }
657
          in
658
          `Finished
659
            ( this_cycle :: previous_cycles
660
            , (new_frontier, sorted_filtered_collected_transitions) ) )
661

662
(** The entry point function for bootstrap controller. When bootstrap finished
663
    it would return a transition frontier with the root breadcrumb and a list
664
    of transitions collected during bootstrap.
665

666
    Bootstrap controller would do the following steps to contrust the
667
    transition frontier:
668
    1. Download the root snarked_ledger.
669
    2. Download the scan state and pending coinbases.
670
    3. Construct the staged ledger from the snarked ledger, scan state and
671
       pending coinbases.
672
    4. Synchronize the consensus local state if necessary.
673
    5. Close the old frontier and reload a new one from disk.
674
 *)
675
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
676
    ~consensus_local_state ~transition_reader ~preferred_peers ~persistent_root
677
    ~persistent_frontier ~initial_root_transition ~catchup_mode =
678
  let open Context in
×
679
  let run_cycle =
680
    run_cycle
681
      ~context:(module Context : CONTEXT)
682
      ~trust_system ~verifier ~network ~consensus_local_state ~transition_reader
683
      ~preferred_peers ~persistent_root ~persistent_frontier
684
      ~initial_root_transition ~catchup_mode
685
  in
686
  O1trace.thread "bootstrap"
687
  @@ fun () ->
688
  let%map time_elapsed, (cycles, result) =
689
    time_deferred @@ Deferred.repeat_until_finished [] run_cycle
×
690
  in
691
  [%log info] "Bootstrap completed in $time_elapsed: $bootstrap_stats"
×
692
    ~metadata:
693
      [ ("time_elapsed", time_to_yojson time_elapsed)
×
694
      ; ( "bootstrap_stats"
695
        , `List (List.map ~f:bootstrap_cycle_stats_to_yojson cycles) )
×
696
      ] ;
697
  Mina_metrics.(
×
698
    Gauge.set Bootstrap.bootstrap_time_ms Core.Time.(Span.to_ms @@ time_elapsed)) ;
×
699
  result
700

701
let%test_module "Bootstrap_controller tests" =
702
  ( module struct
703
    open Pipe_lib
704

705
    let max_frontier_length =
706
      Transition_frontier.global_max_length Genesis_constants.For_unit_tests.t
×
707

708
    let logger = Logger.null ()
×
709

710
    let () =
711
      (* Disable log messages from best_tip_diff logger. *)
712
      Logger.Consumer_registry.register ~commit_id:""
×
713
        ~id:Logger.Logger_id.best_tip_diff ~processor:(Logger.Processor.raw ())
×
714
        ~transport:
715
          (Logger.Transport.create
×
716
             ( module struct
717
               type t = unit
718

719
               let transport () _ = ()
×
720
             end )
721
             () )
722
        ()
723

724
    let trust_system =
725
      let s = Trust_system.null () in
726
      don't_wait_for
×
727
        (Pipe_lib.Strict_pipe.Reader.iter
×
728
           (Trust_system.upcall_pipe s)
×
729
           ~f:(const Deferred.unit) ) ;
×
730
      s
×
731

732
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
733

734
    let proof_level = precomputed_values.proof_level
735

736
    let constraint_constants = precomputed_values.constraint_constants
737

738
    let ledger_sync_config =
739
      Syncable_ledger.create_config
×
740
        ~compile_config:Mina_compile_config.For_unit_tests.t
741
        ~max_subtree_depth:None ~default_subtree_depth:None ()
742

743
    module Context = struct
744
      let logger = logger
745

746
      let precomputed_values = precomputed_values
747

748
      let constraint_constants =
749
        Genesis_constants.For_unit_tests.Constraint_constants.t
750

751
      let consensus_constants = precomputed_values.consensus_constants
752

753
      let ledger_sync_config =
754
        Syncable_ledger.create_config
×
755
          ~compile_config:Mina_compile_config.For_unit_tests.t
756
          ~max_subtree_depth:None ~default_subtree_depth:None ()
757

758
      let proof_cache_db = Proof_cache_tag.For_tests.create_db ()
×
759
    end
760

761
    let verifier =
762
      Async.Thread_safe.block_on_async_exn (fun () ->
×
763
          Verifier.For_tests.default ~constraint_constants ~logger ~proof_level
×
764
            () )
765

766
    module Genesis_ledger = (val precomputed_values.genesis_ledger)
767

768
    let downcast_transition ~sender transition =
769
      let transition =
×
770
        transition |> Mina_block.Validated.remember
×
771
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
772
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
773
      in
774
      Envelope.Incoming.wrap ~data:transition
×
775
        ~sender:(Envelope.Sender.Remote sender)
776

777
    let downcast_breadcrumb ~sender breadcrumb =
778
      downcast_transition ~sender
×
779
        (Transition_frontier.Breadcrumb.validated_transition breadcrumb)
×
780

781
    let make_non_running_bootstrap ~genesis_root ~network =
782
      let transition =
×
783
        genesis_root
784
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
785
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
786
      in
787
      { context = (module Context)
×
788
      ; trust_system
789
      ; verifier
790
      ; best_seen_transition = transition
791
      ; current_root = transition
792
      ; network
793
      ; num_of_root_snarked_ledger_retargeted = 0
794
      }
795

796
    let%test_unit "Bootstrap controller caches all transitions it is passed \
797
                   through the transition_reader" =
798
      let branch_size = (max_frontier_length * 2) + 2 in
×
799
      Quickcheck.test ~trials:1
800
        (let open Quickcheck.Generator.Let_syntax in
801
        (* we only need one node for this test, but we need more than one peer so that mina_networking does not throw an error *)
802
        let%bind fake_network =
803
          Fake_network.Generator.(
804
            gen ~precomputed_values ~verifier ~max_frontier_length
×
805
              ~ledger_sync_config [ fresh_peer; fresh_peer ])
806
        in
807
        let%map make_branch =
808
          Transition_frontier.Breadcrumb.For_tests.gen_seq ~precomputed_values
×
809
            ~verifier
810
            ~accounts_with_secret_keys:(Lazy.force Genesis_ledger.accounts)
×
811
            branch_size
812
        in
813
        let [ me; _ ] = fake_network.peer_networks in
×
814
        let branch =
815
          Async.Thread_safe.block_on_async_exn (fun () ->
816
              make_branch (Transition_frontier.root me.state.frontier) )
×
817
        in
818
        (fake_network, branch))
×
819
        ~f:(fun (fake_network, branch) ->
820
          let [ me; other ] = fake_network.peer_networks in
×
821
          let genesis_root =
822
            Transition_frontier.(
823
              Breadcrumb.validated_transition @@ root me.state.frontier)
×
824
            |> Mina_block.Validated.remember
825
          in
826
          let transition_graph = Transition_cache.create () in
×
827
          let sync_ledger_reader, sync_ledger_writer =
×
828
            Pipe_lib.Strict_pipe.create ~name:"sync_ledger_reader" Synchronous
829
          in
830
          let bootstrap =
×
831
            make_non_running_bootstrap ~genesis_root ~network:me.network
832
          in
833
          let root_sync_ledger =
834
            Sync_ledger.Db.create
835
              (Transition_frontier.root_snarked_ledger me.state.frontier)
×
836
              ~context:(module Context)
837
              ~trust_system
838
          in
839
          Async.Thread_safe.block_on_async_exn (fun () ->
×
840
              let sync_deferred =
×
841
                sync_ledger bootstrap ~root_sync_ledger ~transition_graph
842
                  ~preferred:[] ~sync_ledger_reader
843
              in
844
              let%bind () =
845
                Deferred.List.iter branch ~f:(fun breadcrumb ->
×
846
                    Strict_pipe.Writer.write sync_ledger_writer
×
847
                      ( `Block
848
                          (downcast_breadcrumb ~sender:other.peer breadcrumb)
×
849
                      , `Valid_cb None ) )
850
              in
851
              Strict_pipe.Writer.close sync_ledger_writer ;
×
852
              sync_deferred ) ;
×
853
          let expected_transitions =
×
854
            List.map branch
855
              ~f:
856
                (Fn.compose
×
857
                   (With_hash.map ~f:Mina_block.header)
858
                   (Fn.compose Mina_block.Validation.block_with_hash
×
859
                      (Fn.compose Mina_block.Validated.remember
×
860
                         Transition_frontier.Breadcrumb.validated_transition ) ) )
861
          in
862
          let saved_transitions =
×
863
            Transition_cache.data transition_graph
×
864
            |> List.map ~f:(fun (x, _) ->
865
                   Transition_cache.header_with_hash @@ Envelope.Incoming.data x )
×
866
          in
867
          let module E = struct
×
868
            module T = struct
869
              type t = Mina_block.Header.t State_hash.With_state_hashes.t
×
870
              [@@deriving sexp]
871

872
              let compare = external_transition_compare ~context:(module Context)
873
            end
874

875
            include Comparable.Make (T)
876
          end in
877
          [%test_result: E.Set.t]
×
878
            (E.Set.of_list saved_transitions)
×
879
            ~expect:(E.Set.of_list expected_transitions) )
×
880

881
    let run_bootstrap ~timeout_duration ~my_net ~transition_reader =
882
      let open Fake_network in
×
883
      let time_controller = Block_time.Controller.basic ~logger in
884
      let persistent_root =
885
        Transition_frontier.persistent_root my_net.state.frontier
886
      in
887
      let persistent_frontier =
×
888
        Transition_frontier.persistent_frontier my_net.state.frontier
889
      in
890
      let initial_root_transition =
×
891
        Transition_frontier.(
892
          Breadcrumb.validated_transition (root my_net.state.frontier))
×
893
      in
894
      let%bind () =
895
        Transition_frontier.close ~loc:__LOC__ my_net.state.frontier
×
896
      in
897
      [%log info] "bootstrap begin" ;
×
898
      Block_time.Timeout.await_exn time_controller ~timeout_duration
×
899
        (run
900
           ~context:(module Context)
901
           ~trust_system ~verifier ~network:my_net.network ~preferred_peers:[]
902
           ~consensus_local_state:my_net.state.consensus_local_state
903
           ~transition_reader ~persistent_root ~persistent_frontier
904
           ~catchup_mode:`Super ~initial_root_transition )
905

906
    let assert_transitions_increasingly_sorted ~root
907
        (incoming_transitions : Transition_cache.element list) =
908
      let root =
×
909
        Transition_frontier.Breadcrumb.block root |> Mina_block.header
×
910
      in
911
      ignore
×
912
        ( List.fold_result ~init:root incoming_transitions
×
913
            ~f:(fun max_acc incoming_transition ->
914
              let With_hash.{ data = header; _ } =
×
915
                Transition_cache.header_with_hash
916
                  (Envelope.Incoming.data @@ fst incoming_transition)
×
917
              in
918
              let header_len h =
×
919
                Mina_block.Header.protocol_state h
×
920
                |> Protocol_state.consensus_state
×
921
                |> Consensus.Data.Consensus_state.blockchain_length
922
              in
923
              let open Result.Let_syntax in
924
              let%map () =
925
                Result.ok_if_true
×
926
                  Mina_numbers.Length.(header_len max_acc <= header_len header)
×
927
                  ~error:
928
                    (Error.of_string
×
929
                       "The blocks are not sorted in increasing order" )
930
              in
931
              header )
×
932
          |> Or_error.ok_exn
×
933
          : Mina_block.Header.t )
934

935
    let%test_unit "sync with one node after receiving a transition" =
936
      Quickcheck.test ~trials:1
×
937
        Fake_network.Generator.(
938
          gen ~precomputed_values ~verifier ~max_frontier_length
×
939
            ~ledger_sync_config
940
            [ fresh_peer
941
            ; peer_with_branch
942
                ~frontier_branch_size:((max_frontier_length * 2) + 2)
943
            ])
944
        ~f:(fun fake_network ->
945
          let [ my_net; peer_net ] = fake_network.peer_networks in
×
946
          let transition_reader, transition_writer =
947
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
948
              (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
949
          in
950
          let block =
×
951
            Envelope.Incoming.wrap
952
              ~data:
953
                ( Transition_frontier.best_tip peer_net.state.frontier
×
954
                |> Transition_frontier.Breadcrumb.validated_transition
×
955
                |> Mina_block.Validated.remember
×
956
                |> Mina_block.Validation.reset_frontier_dependencies_validation
×
957
                |> Mina_block.Validation.reset_staged_ledger_diff_validation )
×
958
              ~sender:(Envelope.Sender.Remote peer_net.peer)
959
          in
960
          Pipe_lib.Strict_pipe.Writer.write transition_writer
961
            (`Block block, `Valid_cb None) ;
962
          let new_frontier, sorted_external_transitions =
×
963
            Async.Thread_safe.block_on_async_exn (fun () ->
964
                run_bootstrap
×
965
                  ~timeout_duration:(Block_time.Span.of_ms 30_000L)
×
966
                  ~my_net ~transition_reader )
967
          in
968
          assert_transitions_increasingly_sorted
×
969
            ~root:(Transition_frontier.root new_frontier)
×
970
            sorted_external_transitions ;
971
          [%test_result: Ledger_hash.t]
×
972
            ( Ledger.Db.merkle_root
×
973
            @@ Transition_frontier.root_snarked_ledger new_frontier )
×
974
            ~expect:
975
              ( Ledger.Db.merkle_root
×
976
              @@ Transition_frontier.root_snarked_ledger peer_net.state.frontier
×
977
              ) )
978

979
    let%test_unit "reconstruct staged_ledgers using \
980
                   of_scan_state_and_snarked_ledger" =
981
      Quickcheck.test ~trials:1
×
982
        (Transition_frontier.For_tests.gen ~precomputed_values ~verifier
×
983
           ~max_length:max_frontier_length ~size:max_frontier_length () )
984
        ~f:(fun frontier ->
985
          Thread_safe.block_on_async_exn
×
986
          @@ fun () ->
987
          Deferred.List.iter (Transition_frontier.all_breadcrumbs frontier)
×
988
            ~f:(fun breadcrumb ->
989
              let staged_ledger =
×
990
                Transition_frontier.Breadcrumb.staged_ledger breadcrumb
991
              in
992
              let expected_merkle_root =
×
993
                Staged_ledger.ledger staged_ledger |> Ledger.merkle_root
×
994
              in
995
              let snarked_ledger =
×
996
                Transition_frontier.root_snarked_ledger frontier
×
997
                |> Ledger.of_database
998
              in
999
              let snarked_local_state =
×
1000
                Transition_frontier.root frontier
×
1001
                |> Transition_frontier.Breadcrumb.protocol_state
×
1002
                |> Protocol_state.blockchain_state
×
1003
                |> Blockchain_state.snarked_local_state
1004
              in
1005
              let scan_state = Staged_ledger.scan_state staged_ledger in
×
1006
              let get_state hash =
×
1007
                match Transition_frontier.find_protocol_state frontier hash with
×
1008
                | Some protocol_state ->
×
1009
                    Ok protocol_state
1010
                | None ->
×
1011
                    Or_error.errorf
1012
                      !"Protocol state (for scan state transactions) for \
×
1013
                        %{sexp:State_hash.t} not found"
1014
                      hash
1015
              in
1016
              let pending_coinbases =
1017
                Staged_ledger.pending_coinbase_collection staged_ledger
1018
              in
1019
              let%map actual_staged_ledger =
1020
                Staged_ledger.of_scan_state_pending_coinbases_and_snarked_ledger
1021
                  ~scan_state ~logger ~verifier ~constraint_constants
1022
                  ~snarked_ledger ~snarked_local_state ~expected_merkle_root
1023
                  ~pending_coinbases ~get_state
1024
                |> Deferred.Or_error.ok_exn
×
1025
              in
1026
              let height =
×
1027
                Transition_frontier.Breadcrumb.consensus_state breadcrumb
×
1028
                |> Consensus.Data.Consensus_state.blockchain_length
×
1029
                |> Mina_numbers.Length.to_int
1030
              in
1031
              [%test_eq: Staged_ledger_hash.t]
×
1032
                ~message:
1033
                  (sprintf "mismatch of staged ledger hash height %d" height)
×
1034
                (Transition_frontier.Breadcrumb.staged_ledger_hash breadcrumb)
×
1035
                (Staged_ledger.hash actual_staged_ledger) ) )
×
1036

1037
    (*
1038
    let%test_unit "if we see a new transition that is better than the \
1039
                   transition that we are syncing from, than we should \
1040
                   retarget our root" =
1041
      Quickcheck.test ~trials:1
1042
        Fake_network.Generator.(
1043
          gen ~max_frontier_length
1044
            [ fresh_peer
1045
            ; peer_with_branch ~frontier_branch_size:max_frontier_length
1046
            ; peer_with_branch
1047
                ~frontier_branch_size:((max_frontier_length * 2) + 2) ])
1048
        ~f:(fun fake_network ->
1049
          let [me; weaker_chain; stronger_chain] =
1050
            fake_network.peer_networks
1051
          in
1052
          let transition_reader, transition_writer =
1053
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
1054
              (Buffered (`Capacity 10, `Overflow Drop_head))
1055
          in
1056
          Envelope.Incoming.wrap
1057
            ~data:
1058
              ( Transition_frontier.best_tip weaker_chain.state.frontier
1059
              |> Transition_frontier.Breadcrumb.validated_transition
1060
              |> Mina_block.Validated.to_initial_validated )
1061
            ~sender:
1062
              (Envelope.Sender.Remote
1063
                 (weaker_chain.peer.host, weaker_chain.peer.peer_id))
1064
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1065
          Envelope.Incoming.wrap
1066
            ~data:
1067
              ( Transition_frontier.best_tip stronger_chain.state.frontier
1068
              |> Transition_frontier.Breadcrumb.validated_transition
1069
              |> Mina_block.Validated.to_initial_validated )
1070
            ~sender:
1071
              (Envelope.Sender.Remote
1072
                 (stronger_chain.peer.host, stronger_chain.peer.peer_id))
1073
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1074
          let new_frontier, sorted_external_transitions =
1075
            Async.Thread_safe.block_on_async_exn (fun () ->
1076
                run_bootstrap
1077
                  ~timeout_duration:(Block_time.Span.of_ms 60_000L)
1078
                  ~my_net:me ~transition_reader )
1079
          in
1080
          assert_transitions_increasingly_sorted
1081
            ~root:(Transition_frontier.root new_frontier)
1082
            sorted_external_transitions ;
1083
          [%test_result: Ledger_hash.t]
1084
            ( Ledger.Db.merkle_root
1085
            @@ Transition_frontier.root_snarked_ledger new_frontier )
1086
            ~expect:
1087
              ( Ledger.Db.merkle_root
1088
              @@ Transition_frontier.root_snarked_ledger
1089
                   stronger_chain.state.frontier ) )
1090
*)
1091
  end )
16✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc