• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 507

19 Aug 2025 11:53PM UTC coverage: 33.38% (-28.0%) from 61.334%
507

push

buildkite

web-flow
Merge pull request #17640 from MinaProtocol/georgeee/compatible-to-develop-2025-08-19

Merge `compatible` to `develop` (19 August 2025, pt. 2)

24170 of 72408 relevant lines covered (33.38%)

24770.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.56
/src/lib/bootstrap_controller/bootstrap_controller.ml
1
(* Only show stdout for failed inline tests. *)
44✔
2
open Core
3
open Async
4
open Mina_base
5
module Ledger = Mina_ledger.Ledger
6
module Sync_ledger = Mina_ledger.Sync_ledger
7
open Mina_state
8
open Pipe_lib.Strict_pipe
9
open Network_peer
10
module Transition_cache = Transition_cache
11

12
module type CONTEXT = sig
13
  val logger : Logger.t
14

15
  val precomputed_values : Precomputed_values.t
16

17
  val constraint_constants : Genesis_constants.Constraint_constants.t
18

19
  val consensus_constants : Consensus.Constants.t
20

21
  val ledger_sync_config : Syncable_ledger.daemon_config
22

23
  val proof_cache_db : Proof_cache_tag.cache_db
24
end
25

26
type Structured_log_events.t += Bootstrap_complete
27
  [@@deriving register_event { msg = "Bootstrap state: complete." }]
×
28

29
type t =
30
  { context : (module CONTEXT)
31
  ; trust_system : Trust_system.t
32
  ; verifier : Verifier.t
33
  ; mutable best_seen_transition : Mina_block.initial_valid_block
34
  ; mutable current_root : Mina_block.initial_valid_block
35
  ; network : Mina_networking.t
36
  ; mutable num_of_root_snarked_ledger_retargeted : int
37
  }
38

39
type time = Time.Span.t
40

41
let time_to_yojson span =
42
  `String (Printf.sprintf "%f seconds" (Time.Span.to_sec span))
×
43

44
type opt_time = time option
45

46
let opt_time_to_yojson = function
47
  | Some time ->
×
48
      time_to_yojson time
49
  | None ->
×
50
      `Null
51

52
(** An auxiliary data structure for collecting various metrics for bootstrap controller. *)
53
type bootstrap_cycle_stats =
×
54
  { cycle_result : string
×
55
  ; sync_ledger_time : time
×
56
  ; staged_ledger_data_download_time : time
×
57
  ; staged_ledger_construction_time : opt_time
×
58
  ; local_state_sync_required : bool
×
59
  ; local_state_sync_time : opt_time
×
60
  }
61
[@@deriving to_yojson]
62

63
let time_deferred deferred =
64
  let start_time = Time.now () in
×
65
  let%map result = deferred in
66
  let end_time = Time.now () in
×
67
  (Time.diff end_time start_time, result)
×
68

69
let worth_getting_root ({ context = (module Context); _ } as t) candidate =
70
  let module Consensus_context = struct
×
71
    include Context
72

73
    let logger =
74
      Logger.extend logger
×
75
        [ ( "selection_context"
76
          , `String "Bootstrap_controller.worth_getting_root" )
77
        ]
78
  end in
79
  Consensus.Hooks.equal_select_status `Take
80
  @@ Consensus.Hooks.select
81
       ~context:(module Consensus_context)
82
       ~existing:
83
         ( t.best_seen_transition |> Mina_block.Validation.block_with_hash
×
84
         |> With_hash.map ~f:Mina_block.consensus_state )
×
85
       ~candidate
86

87
let received_bad_proof ({ context = (module Context); _ } as t) host e =
88
  let open Context in
×
89
  Trust_system.(
90
    record t.trust_system logger host
91
      Actions.
92
        ( Violated_protocol
93
        , Some
94
            ( "Bad ancestor proof: $error"
95
            , [ ("error", Error_json.error_to_yojson e) ] ) ))
×
96

97
let done_syncing_root root_sync_ledger =
98
  Option.is_some (Sync_ledger.Root.peek_valid_tree root_sync_ledger)
×
99

100
let should_sync ~root_sync_ledger t candidate_state =
101
  (not @@ done_syncing_root root_sync_ledger)
×
102
  && worth_getting_root t candidate_state
×
103

104
(** Update [Synced_ledger]'s target and [best_seen_transition] and [current_root] accordingly. *)
105
let start_sync_job_with_peer ~sender ~root_sync_ledger
106
    ({ context = (module Context); _ } as t) peer_best_tip peer_root =
107
  let open Context in
×
108
  let%bind () =
109
    Trust_system.(
110
      record t.trust_system logger sender
×
111
        Actions.
112
          ( Fulfilled_request
113
          , Some ("Received verified peer root and best tip", []) ))
114
  in
115
  t.best_seen_transition <- peer_best_tip ;
×
116
  t.current_root <- peer_root ;
117
  let blockchain_state =
118
    t.current_root |> Mina_block.Validation.block |> Mina_block.header
×
119
    |> Mina_block.Header.protocol_state |> Protocol_state.blockchain_state
×
120
  in
121
  let expected_staged_ledger_hash =
×
122
    blockchain_state |> Blockchain_state.staged_ledger_hash
123
  in
124
  let snarked_ledger_hash =
×
125
    blockchain_state |> Blockchain_state.snarked_ledger_hash
126
  in
127
  return
×
128
  @@
129
  match
130
    Sync_ledger.Root.new_goal root_sync_ledger
131
      (Frozen_ledger_hash.to_ledger_hash snarked_ledger_hash)
×
132
      ~data:
133
        ( State_hash.With_state_hashes.state_hash
×
134
          @@ Mina_block.Validation.block_with_hash t.current_root
×
135
        , sender
136
        , expected_staged_ledger_hash )
137
      ~equal:(fun (hash1, _, _) (hash2, _, _) -> State_hash.equal hash1 hash2)
×
138
  with
139
  | `New ->
×
140
      t.num_of_root_snarked_ledger_retargeted <-
141
        t.num_of_root_snarked_ledger_retargeted + 1 ;
142
      `Syncing_new_snarked_ledger
143
  | `Update_data ->
×
144
      `Updating_root_transition
145
  | `Repeat ->
×
146
      `Ignored
147

148
let to_consensus_state h =
149
  Mina_block.Header.protocol_state h |> Protocol_state.consensus_state
×
150

151
(** For each transition, this function would compare it with the existing one.
152
    If the incoming transition is better, then download the merkle list from
153
    that transition to its root and verify it. If we get a better root than
154
    the existing one, then reset the Sync_ledger's target by calling
155
    [start_sync_job_with_peer] function. *)
156
let on_transition ({ context = (module Context); _ } as t) ~sender
157
    ~root_sync_ledger candidate_header =
158
  let open Context in
×
159
  let candidate_consensus_state =
160
    With_hash.map ~f:to_consensus_state candidate_header
161
  in
162
  if not @@ should_sync ~root_sync_ledger t candidate_consensus_state then
×
163
    Deferred.return `Ignored
×
164
  else
165
    match%bind
166
      Mina_networking.get_ancestry t.network sender.Peer.peer_id
×
167
        (With_hash.map_hash candidate_consensus_state
×
168
           ~f:State_hash.State_hashes.state_hash )
169
    with
170
    | Error e ->
×
171
        [%log error]
×
172
          ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
173
          !"Could not get the proof of the root transition from the network: \
174
            $error" ;
175
        Deferred.return `Ignored
×
176
    | Ok peer_root_with_proof -> (
×
177
        let pcd =
178
          peer_root_with_proof.data
179
          |> Proof_carrying_data.map
×
180
               ~f:(Mina_block.write_all_proofs_to_disk ~proof_cache_db)
181
          |> Proof_carrying_data.map_proof
182
               ~f:
183
                 (Tuple2.map_snd
184
                    ~f:(Mina_block.write_all_proofs_to_disk ~proof_cache_db) )
185
        in
186
        match%bind
187
          Mina_block.verify_on_header
×
188
            ~verify:
189
              (Sync_handler.Root.verify
×
190
                 ~context:(module Context)
191
                 ~verifier:t.verifier candidate_consensus_state )
192
            pcd
193
        with
194
        | Ok (`Root root, `Best_tip best_tip) ->
×
195
            if done_syncing_root root_sync_ledger then return `Ignored
×
196
            else
197
              start_sync_job_with_peer ~sender ~root_sync_ledger t best_tip root
×
198
        | Error e ->
×
199
            return (received_bad_proof t sender e |> Fn.const `Ignored) )
×
200

201
(** A helper function that wraps the calls to Sync_ledger and iterate through
202
    incoming transitions, add those to the transition_cache and calls
203
    [on_transition] function. *)
204
let sync_ledger ({ context = (module Context); _ } as t) ~preferred
205
    ~root_sync_ledger ~transition_graph ~sync_ledger_reader =
206
  let open Context in
×
207
  let query_reader = Sync_ledger.Root.query_reader root_sync_ledger in
208
  let response_writer = Sync_ledger.Root.answer_writer root_sync_ledger in
×
209
  Mina_networking.glue_sync_ledger ~preferred t.network query_reader
×
210
    response_writer ;
211
  Pipe_lib.Choosable_synchronous_pipe.iter sync_ledger_reader
×
212
    ~f:(fun (b_or_h, `Valid_cb vc) ->
213
      let header_with_hash, sender, transition_cache_element =
×
214
        match b_or_h with
215
        | `Block b_env ->
×
216
            ( Envelope.Incoming.data b_env
×
217
              |> Mina_block.Validation.block_with_hash
×
218
              |> With_hash.map ~f:Mina_block.header
×
219
            , Envelope.Incoming.remote_sender_exn b_env
×
220
            , Envelope.Incoming.map ~f:(fun x -> `Block x) b_env )
×
221
        | `Header h_env ->
×
222
            ( Envelope.Incoming.data h_env
×
223
              |> Mina_block.Validation.header_with_hash
×
224
            , Envelope.Incoming.remote_sender_exn h_env
×
225
            , Envelope.Incoming.map ~f:(fun x -> `Header x) h_env )
×
226
      in
227
      let previous_state_hash =
228
        With_hash.data header_with_hash
×
229
        |> Mina_block.Header.protocol_state
×
230
        |> Protocol_state.previous_state_hash
231
      in
232
      Transition_cache.add transition_graph ~parent:previous_state_hash
×
233
        (transition_cache_element, vc) ;
234
      (* TODO: Efficiently limiting the number of green threads in #1337 *)
235
      if
×
236
        worth_getting_root t
237
          (With_hash.map ~f:to_consensus_state header_with_hash)
×
238
      then (
×
239
        [%log trace] "Added the transition from sync_ledger_reader into cache"
×
240
          ~metadata:
241
            [ ( "state_hash"
242
              , State_hash.to_yojson
×
243
                  (State_hash.With_state_hashes.state_hash header_with_hash) )
×
244
            ; ( "header"
245
              , Mina_block.Header.to_yojson (With_hash.data header_with_hash) )
×
246
            ] ;
247

248
        Deferred.ignore_m
×
249
        @@ on_transition t ~sender ~root_sync_ledger header_with_hash )
×
250
      else Deferred.unit )
×
251

252
let external_transition_compare ~context:(module Context : CONTEXT) =
253
  let get_consensus_state =
×
254
    Fn.compose Protocol_state.consensus_state Mina_block.Header.protocol_state
255
  in
256
  Comparable.lift
×
257
    (fun existing candidate ->
258
      (* To prevent the logger to spam a lot of messages, the logger input is set to null *)
259
      if
×
260
        State_hash.equal
261
          (State_hash.With_state_hashes.state_hash existing)
×
262
          (State_hash.With_state_hashes.state_hash candidate)
×
263
      then 0
×
264
      else if
×
265
        Consensus.Hooks.equal_select_status `Keep
266
        @@ Consensus.Hooks.select ~context:(module Context) ~existing ~candidate
267
      then -1
×
268
      else 1 )
×
269
    ~f:(With_hash.map ~f:get_consensus_state)
270

271
let download_snarked_ledger ~trust_system ~preferred_peers ~transition_graph
272
    ~sync_ledger_reader ~context t temp_snarked_ledger =
273
  time_deferred
×
274
    (let root_sync_ledger =
275
       Sync_ledger.Root.create temp_snarked_ledger ~context ~trust_system
276
     in
277
     don't_wait_for
×
278
       (sync_ledger t ~preferred:preferred_peers ~root_sync_ledger
×
279
          ~transition_graph ~sync_ledger_reader ) ;
280
     (* We ignore the resulting ledger returned here since it will always
281
        * be the same as the ledger we started with because we are syncing
282
        * a db ledger. *)
283
     let%map _, data = Sync_ledger.Root.valid_tree root_sync_ledger in
×
284
     Sync_ledger.Root.destroy root_sync_ledger ;
×
285
     data )
×
286

287
(** Run one bootstrap cycle *)
288
let run_cycle ~context:(module Context : CONTEXT) ~trust_system ~verifier
289
    ~network ~consensus_local_state ~network_transition_pipe ~preferred_peers
290
    ~persistent_root ~persistent_frontier ~initial_root_transition ~catchup_mode
291
    previous_cycles =
292
  let open Context in
×
293
  (* The short-lived pipe allocated here will be closed
294
     when a follow-up pipe is allocated: in the next cycle of bootstrap
295
     or when controll is passed to the transition frontier controller.staged_ledger_construction_time
296
     Because [Choosable_synchronous_pipe.t] is used, no data will be lost: any
297
     successful read is followed by mom-blocking handling, and if the read/write
298
     pair is not consumed, it will continue in a follow-up pipe allocated in the
299
     next call to [Swappable.swap_reader].
300
  *)
301
  let%bind sync_ledger_reader = Swappable.swap_reader network_transition_pipe in
×
302
  let initial_root_transition =
×
303
    initial_root_transition |> Mina_block.Validated.remember
×
304
    |> Mina_block.Validation.reset_frontier_dependencies_validation
×
305
    |> Mina_block.Validation.reset_staged_ledger_diff_validation
306
  in
307
  let t =
×
308
    { network
309
    ; context = (module Context)
310
    ; trust_system
311
    ; verifier
312
    ; best_seen_transition = initial_root_transition
313
    ; current_root = initial_root_transition
314
    ; num_of_root_snarked_ledger_retargeted = 0
315
    }
316
  in
317
  let transition_graph = Transition_cache.create () in
318
  let temp_persistent_root_instance =
×
319
    Transition_frontier.Persistent_root.create_instance_exn persistent_root
320
  in
321
  let temp_snarked_ledger =
×
322
    Transition_frontier.Persistent_root.Instance.snarked_ledger
323
      temp_persistent_root_instance
324
  in
325
  (* step 1. download snarked_ledger *)
326
  let%bind sync_ledger_time, (hash, sender, expected_staged_ledger_hash) =
327
    download_snarked_ledger
×
328
      ~context:(module Context)
329
      ~trust_system ~preferred_peers ~transition_graph ~sync_ledger_reader t
330
      temp_snarked_ledger
331
  in
332
  Mina_metrics.(
×
333
    Counter.inc Bootstrap.root_snarked_ledger_sync_ms
×
334
      Time.Span.(to_ms sync_ledger_time)) ;
×
335
  Mina_metrics.(
336
    Gauge.set Bootstrap.num_of_root_snarked_ledger_retargeted
×
337
      (Float.of_int t.num_of_root_snarked_ledger_retargeted)) ;
×
338
  (* step 2. Download scan state and pending coinbases. *)
339
  let%bind ( staged_ledger_data_download_time
340
           , staged_ledger_construction_time
341
           , staged_ledger_aux_result ) =
342
    let%bind ( staged_ledger_data_download_time
343
             , staged_ledger_data_download_result ) =
344
      time_deferred
×
345
        (Mina_networking.get_staged_ledger_aux_and_pending_coinbases_at_hash
×
346
           t.network sender.peer_id hash )
347
    in
348
    match staged_ledger_data_download_result with
×
349
    | Error err ->
×
350
        Deferred.return (staged_ledger_data_download_time, None, Error err)
351
    | Ok
×
352
        ( scan_state_uncached
353
        , expected_merkle_root
354
        , pending_coinbases
355
        , protocol_states ) -> (
356
        let%map staged_ledger_construction_result =
357
          O1trace.thread "construct_root_staged_ledger" (fun () ->
×
358
              let open Deferred.Or_error.Let_syntax in
×
359
              let received_staged_ledger_hash =
360
                Staged_ledger_hash.of_aux_ledger_and_coinbase_hash
361
                  (Staged_ledger.Scan_state.Stable.Latest.hash
×
362
                     scan_state_uncached )
363
                  expected_merkle_root pending_coinbases
364
              in
365
              [%log debug]
×
366
                ~metadata:
367
                  [ ( "expected_staged_ledger_hash"
368
                    , Staged_ledger_hash.to_yojson expected_staged_ledger_hash
×
369
                    )
370
                  ; ( "received_staged_ledger_hash"
371
                    , Staged_ledger_hash.to_yojson received_staged_ledger_hash
×
372
                    )
373
                  ]
374
                "Comparing $expected_staged_ledger_hash to \
375
                 $received_staged_ledger_hash" ;
376
              let%bind new_root =
377
                t.current_root
378
                |> Mina_block.Validation.skip_frontier_dependencies_validation
×
379
                     `This_block_belongs_to_a_detached_subtree
380
                |> Mina_block.Validation.validate_staged_ledger_hash
×
381
                     (`Staged_ledger_already_materialized
382
                       received_staged_ledger_hash )
383
                |> Result.map_error ~f:(fun _ ->
×
384
                       Error.of_string "received faulty scan state from peer" )
×
385
                |> Deferred.return
×
386
              in
387
              let protocol_states =
×
388
                List.map protocol_states
389
                  ~f:(With_hash.of_data ~hash_data:Protocol_state.hashes)
390
              in
391
              let scan_state =
×
392
                Staged_ledger.Scan_state.write_all_proofs_to_disk
393
                  ~proof_cache_db scan_state_uncached
394
              in
395
              let%bind protocol_states =
396
                Staged_ledger.Scan_state.check_required_protocol_states
×
397
                  scan_state ~protocol_states
398
                |> Deferred.return
×
399
              in
400
              let protocol_states_map =
×
401
                protocol_states
402
                |> List.map ~f:(fun ps ->
×
403
                       (State_hash.With_state_hashes.state_hash ps, ps) )
×
404
                |> State_hash.Map.of_alist_exn
405
              in
406
              let get_state hash =
×
407
                match Map.find protocol_states_map hash with
×
408
                | None ->
×
409
                    let new_state_hash =
410
                      State_hash.With_state_hashes.state_hash (fst new_root)
×
411
                    in
412
                    [%log error]
×
413
                      ~metadata:
414
                        [ ("new_root", State_hash.to_yojson new_state_hash)
×
415
                        ; ("state_hash", State_hash.to_yojson hash)
×
416
                        ]
417
                      "Protocol state (for scan state transactions) for \
418
                       $state_hash not found when bootstrapping to the new \
419
                       root $new_root" ;
420
                    Or_error.errorf
×
421
                      !"Protocol state (for scan state transactions) for \
×
422
                        %{sexp:State_hash.t} not found when bootstrapping to \
423
                        the new root %{sexp:State_hash.t}"
424
                      hash new_state_hash
425
                | Some protocol_state ->
×
426
                    Ok (With_hash.data protocol_state)
×
427
              in
428
              (* step 3. Construct staged ledger from snarked ledger, scan state
429
                 and pending coinbases. *)
430
              (* Construct the staged ledger before constructing the transition
431
               * frontier in order to verify the scan state we received.
432
               * TODO: reorganize the code to avoid doing this twice (#3480) *)
433
              let open Deferred.Let_syntax in
434
              let%map staged_ledger_construction_time, construction_result =
435
                time_deferred
×
436
                  (let open Deferred.Let_syntax in
437
                  let temp_mask = Ledger.Root.as_masked temp_snarked_ledger in
438
                  let%map result =
439
                    Staged_ledger
440
                    .of_scan_state_pending_coinbases_and_snarked_ledger ~logger
441
                      ~snarked_local_state:
442
                        Mina_block.(
443
                          t.current_root |> Validation.block |> header
×
444
                          |> Header.protocol_state
×
445
                          |> Protocol_state.blockchain_state
×
446
                          |> Blockchain_state.snarked_local_state)
×
447
                      ~verifier ~constraint_constants ~scan_state
448
                      ~snarked_ledger:temp_mask ~expected_merkle_root
449
                      ~pending_coinbases ~get_state
450
                  in
451
                  ignore
×
452
                    ( Ledger.Maskable.unregister_mask_exn ~loc:__LOC__ temp_mask
×
453
                      : Ledger.unattached_mask ) ;
454
                  Result.map result
455
                    ~f:
456
                      (const
×
457
                         ( scan_state
458
                         , pending_coinbases
459
                         , new_root
460
                         , protocol_states ) ))
461
              in
462
              Ok (staged_ledger_construction_time, construction_result) )
×
463
        in
464
        match staged_ledger_construction_result with
×
465
        | Error err ->
×
466
            (staged_ledger_data_download_time, None, Error err)
467
        | Ok (staged_ledger_construction_time, result) ->
×
468
            ( staged_ledger_data_download_time
469
            , Some staged_ledger_construction_time
470
            , result ) )
471
  in
472
  Transition_frontier.Persistent_root.Instance.close
×
473
    temp_persistent_root_instance ;
474
  match staged_ledger_aux_result with
×
475
  | Error e ->
×
476
      let%map () =
477
        Trust_system.(
478
          record t.trust_system logger sender
×
479
            Actions.
480
              ( Outgoing_connection_error
481
              , Some
482
                  ( "Can't find scan state from the peer or received faulty \
483
                     scan state from the peer."
484
                  , [] ) ))
485
      in
486
      [%log error]
×
487
        ~metadata:
488
          [ ("error", Error_json.error_to_yojson e)
×
489
          ; ("state_hash", State_hash.to_yojson hash)
×
490
          ; ( "expected_staged_ledger_hash"
491
            , Staged_ledger_hash.to_yojson expected_staged_ledger_hash )
×
492
          ]
493
        "Failed to find scan state for the transition with hash $state_hash \
494
         from the peer or received faulty scan state: $error. Retry bootstrap" ;
495
      let this_cycle =
×
496
        { cycle_result = "failed to download and construct scan state"
497
        ; sync_ledger_time
498
        ; staged_ledger_data_download_time
499
        ; staged_ledger_construction_time
500
        ; local_state_sync_required = false
501
        ; local_state_sync_time = None
502
        }
503
      in
504
      `Repeat (this_cycle :: previous_cycles)
505
  | Ok (scan_state, pending_coinbase, new_root, protocol_states) -> (
×
506
      let%bind () =
507
        Trust_system.(
508
          record t.trust_system logger sender
×
509
            Actions.
510
              ( Fulfilled_request
511
              , Some ("Received valid scan state from peer", []) ))
512
      in
513
      let best_seen_block_with_hash, _ = t.best_seen_transition in
×
514
      let consensus_state =
515
        With_hash.data best_seen_block_with_hash
×
516
        |> Mina_block.header |> Mina_block.Header.protocol_state
×
517
        |> Protocol_state.consensus_state
518
      in
519
      (* step 4. Synchronize consensus local state if necessary *)
520
      let%bind ( local_state_sync_time
521
               , (local_state_sync_required, local_state_sync_result) ) =
522
        time_deferred
×
523
          ( match
524
              Consensus.Hooks.required_local_state_sync
525
                ~constants:precomputed_values.consensus_constants
526
                ~consensus_state ~local_state:consensus_local_state
527
            with
528
          | None ->
×
529
              [%log debug]
×
530
                ~metadata:
531
                  [ ( "local_state"
532
                    , Consensus.Data.Local_state.to_yojson consensus_local_state
×
533
                    )
534
                  ; ( "consensus_state"
535
                    , Consensus.Data.Consensus_state.Value.to_yojson
×
536
                        consensus_state )
537
                  ]
538
                "Not synchronizing consensus local state" ;
539
              Deferred.return (false, Or_error.return ())
×
540
          | Some sync_jobs ->
×
541
              [%log info] "Synchronizing consensus local state" ;
×
542
              let%map result =
543
                Consensus.Hooks.sync_local_state
×
544
                  ~context:(module Context)
545
                  ~local_state:consensus_local_state ~trust_system
546
                  ~glue_sync_ledger:(Mina_networking.glue_sync_ledger t.network)
×
547
                  sync_jobs
548
              in
549
              (true, result) )
×
550
      in
551
      match local_state_sync_result with
×
552
      | Error e ->
×
553
          [%log error]
×
554
            ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
555
            "Local state sync failed: $error. Retry bootstrap" ;
556
          let this_cycle =
×
557
            { cycle_result = "failed to synchronize local state"
558
            ; sync_ledger_time
559
            ; staged_ledger_data_download_time
560
            ; staged_ledger_construction_time
561
            ; local_state_sync_required
562
            ; local_state_sync_time = Some local_state_sync_time
563
            }
564
          in
565
          Deferred.return (`Repeat (this_cycle :: previous_cycles))
566
      | Ok () ->
×
567
          (* step 5. Close the old frontier and reload a new one from disk. *)
568
          let new_root_data : Transition_frontier.Root_data.Limited.t =
569
            Transition_frontier.Root_data.Limited.create
570
              ~transition:(Mina_block.Validated.lift new_root)
×
571
              ~scan_state ~pending_coinbase ~protocol_states
572
          in
573
          let%bind () =
574
            Transition_frontier.Persistent_frontier.reset_database_exn
×
575
              persistent_frontier ~root_data:new_root_data
576
              ~genesis_state_hash:
577
                (State_hash.With_state_hashes.state_hash
×
578
                   precomputed_values.protocol_state_with_hashes )
579
          in
580
          (* TODO: lazy load db in persistent root to avoid unnecessary opens like this *)
581
          Transition_frontier.Persistent_root.(
×
582
            with_instance_exn persistent_root ~f:(fun instance ->
×
583
                Instance.set_root_state_hash instance
×
584
                @@ Mina_block.Validated.state_hash
×
585
                @@ Mina_block.Validated.lift new_root )) ;
×
586
          let%map new_frontier =
587
            let fail msg =
588
              failwith
×
589
                ( "failed to initialize transition frontier after \
590
                   bootstrapping: " ^ msg )
591
            in
592
            Transition_frontier.load
×
593
              ~context:(module Context)
594
              ~retry_with_fresh_db:false ~verifier ~consensus_local_state
595
              ~persistent_root ~persistent_frontier ~catchup_mode ()
596
            >>| function
×
597
            | Ok frontier ->
×
598
                frontier
599
            | Error (`Failure msg) ->
×
600
                fail msg
601
            | Error `Bootstrap_required ->
×
602
                fail
603
                  "bootstrap still required (indicates logical error in code)"
604
            | Error `Persistent_frontier_malformed ->
×
605
                fail "persistent frontier was malformed"
606
            | Error `Snarked_ledger_mismatch ->
×
607
                fail
608
                  "this should not happen, because we just reset the \
609
                   snarked_ledger"
610
          in
611
          [%str_log info] Bootstrap_complete ;
×
612
          let collected_transitions = Transition_cache.data transition_graph in
×
613
          let logger =
×
614
            Logger.extend logger
615
              [ ("context", `String "Filter collected transitions in bootstrap")
616
              ]
617
          in
618
          let root_consensus_state =
×
619
            Transition_frontier.(
620
              Breadcrumb.consensus_state_with_hashes (root new_frontier))
×
621
          in
622
          let filtered_collected_transitions =
623
            List.filter collected_transitions
624
              ~f:(fun (incoming_transition, _) ->
625
                let transition =
×
626
                  Envelope.Incoming.data incoming_transition
×
627
                  |> Transition_cache.header_with_hash
628
                in
629
                Consensus.Hooks.equal_select_status `Take
×
630
                @@ Consensus.Hooks.select
631
                     ~context:(module Context)
632
                     ~existing:root_consensus_state
633
                     ~candidate:
634
                       (With_hash.map
×
635
                          ~f:
636
                            (Fn.compose Protocol_state.consensus_state
×
637
                               Mina_block.Header.protocol_state )
638
                          transition ) )
639
          in
640
          [%log debug] "Sorting filtered transitions by consensus state"
×
641
            ~metadata:[] ;
642
          let sorted_filtered_collected_transitions =
×
643
            O1trace.sync_thread "sorting_collected_transitions" (fun () ->
644
                List.sort filtered_collected_transitions
×
645
                  ~compare:
646
                    (Comparable.lift
×
647
                       ~f:(fun (x, _) ->
648
                         Transition_cache.header_with_hash
×
649
                         @@ Envelope.Incoming.data x )
×
650
                       (external_transition_compare ~context:(module Context)) ) )
651
          in
652
          let this_cycle =
×
653
            { cycle_result = "success"
654
            ; sync_ledger_time
655
            ; staged_ledger_data_download_time
656
            ; staged_ledger_construction_time
657
            ; local_state_sync_required
658
            ; local_state_sync_time = Some local_state_sync_time
659
            }
660
          in
661
          `Finished
662
            ( this_cycle :: previous_cycles
663
            , (new_frontier, sorted_filtered_collected_transitions) ) )
664

665
(** The entry point function for bootstrap controller. When bootstrap finished
666
    it would return a transition frontier with the root breadcrumb and a list
667
    of transitions collected during bootstrap.
668

669
    Bootstrap controller would do the following steps to contrust the
670
    transition frontier:
671
    1. Download the root snarked_ledger.
672
    2. Download the scan state and pending coinbases.
673
    3. Construct the staged ledger from the snarked ledger, scan state and
674
       pending coinbases.
675
    4. Synchronize the consensus local state if necessary.
676
    5. Close the old frontier and reload a new one from disk.
677
 *)
678
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
679
    ~consensus_local_state ~network_transition_pipe ~preferred_peers
680
    ~persistent_root ~persistent_frontier ~initial_root_transition ~catchup_mode
681
    =
682
  let open Context in
×
683
  let run_cycle =
684
    run_cycle
685
      ~context:(module Context : CONTEXT)
686
      ~trust_system ~verifier ~network ~consensus_local_state
687
      ~network_transition_pipe ~preferred_peers ~persistent_root
688
      ~persistent_frontier ~initial_root_transition ~catchup_mode
689
  in
690
  O1trace.thread "bootstrap"
691
  @@ fun () ->
692
  let%map time_elapsed, (cycles, result) =
693
    time_deferred @@ Deferred.repeat_until_finished [] run_cycle
×
694
  in
695
  [%log info] "Bootstrap completed in $time_elapsed: $bootstrap_stats"
×
696
    ~metadata:
697
      [ ("time_elapsed", time_to_yojson time_elapsed)
×
698
      ; ( "bootstrap_stats"
699
        , `List (List.map ~f:bootstrap_cycle_stats_to_yojson cycles) )
×
700
      ] ;
701
  Mina_metrics.(
×
702
    Gauge.set Bootstrap.bootstrap_time_ms Core.Time.(Span.to_ms @@ time_elapsed)) ;
×
703
  result
704

705
let%test_module "Bootstrap_controller tests" =
706
  ( module struct
707
    let max_frontier_length =
708
      Transition_frontier.global_max_length Genesis_constants.For_unit_tests.t
×
709

710
    let logger = Logger.null ()
×
711

712
    let () =
713
      (* Disable log messages from best_tip_diff logger. *)
714
      Logger.Consumer_registry.register ~commit_id:""
×
715
        ~id:Logger.Logger_id.best_tip_diff ~processor:(Logger.Processor.raw ())
×
716
        ~transport:
717
          (Logger.Transport.create
×
718
             ( module struct
719
               type t = unit
720

721
               let transport () _ = ()
×
722
             end )
723
             () )
724
        ()
725

726
    let trust_system =
727
      let s = Trust_system.null () in
728
      don't_wait_for
×
729
        (Pipe_lib.Strict_pipe.Reader.iter
×
730
           (Trust_system.upcall_pipe s)
×
731
           ~f:(const Deferred.unit) ) ;
×
732
      s
×
733

734
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
735

736
    let proof_level = precomputed_values.proof_level
737

738
    let constraint_constants = precomputed_values.constraint_constants
739

740
    let ledger_sync_config =
741
      Syncable_ledger.create_config
×
742
        ~compile_config:Mina_compile_config.For_unit_tests.t
743
        ~max_subtree_depth:None ~default_subtree_depth:None ()
744

745
    module Context = struct
746
      let logger = logger
747

748
      let precomputed_values = precomputed_values
749

750
      let constraint_constants =
751
        Genesis_constants.For_unit_tests.Constraint_constants.t
752

753
      let consensus_constants = precomputed_values.consensus_constants
754

755
      let ledger_sync_config =
756
        Syncable_ledger.create_config
×
757
          ~compile_config:Mina_compile_config.For_unit_tests.t
758
          ~max_subtree_depth:None ~default_subtree_depth:None ()
759

760
      let proof_cache_db = Proof_cache_tag.For_tests.create_db ()
×
761
    end
762

763
    let verifier =
764
      Async.Thread_safe.block_on_async_exn (fun () ->
×
765
          Verifier.For_tests.default ~constraint_constants ~logger ~proof_level
×
766
            () )
767

768
    module Genesis_ledger = (val precomputed_values.genesis_ledger)
769

770
    let downcast_transition ~sender transition =
771
      let transition =
×
772
        transition |> Mina_block.Validated.remember
×
773
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
774
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
775
      in
776
      Envelope.Incoming.wrap ~data:transition
×
777
        ~sender:(Envelope.Sender.Remote sender)
778

779
    let downcast_breadcrumb ~sender breadcrumb =
780
      downcast_transition ~sender
×
781
        (Transition_frontier.Breadcrumb.validated_transition breadcrumb)
×
782

783
    let make_non_running_bootstrap ~genesis_root ~network =
784
      let transition =
×
785
        genesis_root
786
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
787
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
788
      in
789
      { context = (module Context)
×
790
      ; trust_system
791
      ; verifier
792
      ; best_seen_transition = transition
793
      ; current_root = transition
794
      ; network
795
      ; num_of_root_snarked_ledger_retargeted = 0
796
      }
797

798
    let%test_unit "Bootstrap controller caches all transitions it is passed \
799
                   through the transition_reader" =
800
      let branch_size = (max_frontier_length * 2) + 2 in
×
801
      Quickcheck.test ~trials:1
802
        (let open Quickcheck.Generator.Let_syntax in
803
        (* we only need one node for this test, but we need more than one peer so that mina_networking does not throw an error *)
804
        let%bind fake_network =
805
          Fake_network.Generator.(
806
            gen ~precomputed_values ~verifier ~max_frontier_length
×
807
              ~ledger_sync_config [ fresh_peer; fresh_peer ])
808
        in
809
        let%map make_branch =
810
          Transition_frontier.Breadcrumb.For_tests.gen_seq ~precomputed_values
×
811
            ~verifier
812
            ~accounts_with_secret_keys:(Lazy.force Genesis_ledger.accounts)
×
813
            branch_size
814
        in
815
        let [ me; _ ] = fake_network.peer_networks in
×
816
        let branch =
817
          Async.Thread_safe.block_on_async_exn (fun () ->
818
              make_branch (Transition_frontier.root me.state.frontier) )
×
819
        in
820
        (fake_network, branch))
×
821
        ~f:(fun (fake_network, branch) ->
822
          let [ me; other ] = fake_network.peer_networks in
×
823
          let genesis_root =
824
            Transition_frontier.(
825
              Breadcrumb.validated_transition @@ root me.state.frontier)
×
826
            |> Mina_block.Validated.remember
827
          in
828
          let transition_graph = Transition_cache.create () in
×
829
          let sync_ledger_reader, sync_ledger_writer =
×
830
            Pipe_lib.Choosable_synchronous_pipe.create ()
831
          in
832
          let bootstrap =
×
833
            make_non_running_bootstrap ~genesis_root ~network:me.network
834
          in
835
          let root_sync_ledger =
836
            Sync_ledger.Root.create
837
              (Transition_frontier.root_snarked_ledger me.state.frontier)
×
838
              ~context:(module Context)
839
              ~trust_system
840
          in
841
          Async.Thread_safe.block_on_async_exn (fun () ->
×
842
              let sync_deferred =
×
843
                sync_ledger bootstrap ~root_sync_ledger ~transition_graph
844
                  ~preferred:[] ~sync_ledger_reader
845
              in
846
              let%bind sync_ledger_writer' =
847
                Deferred.List.fold ~init:sync_ledger_writer branch
×
848
                  ~f:(fun sync_ledger_writer breadcrumb ->
849
                    Pipe_lib.Choosable_synchronous_pipe.write sync_ledger_writer
×
850
                      ( `Block
851
                          (downcast_breadcrumb ~sender:other.peer breadcrumb)
×
852
                      , `Valid_cb None ) )
853
              in
854
              Pipe_lib.Choosable_synchronous_pipe.close sync_ledger_writer' ;
×
855
              sync_deferred ) ;
×
856
          let expected_transitions =
×
857
            List.map branch
858
              ~f:
859
                (Fn.compose
×
860
                   (With_hash.map ~f:Mina_block.header)
861
                   (Fn.compose Mina_block.Validation.block_with_hash
×
862
                      (Fn.compose Mina_block.Validated.remember
×
863
                         Transition_frontier.Breadcrumb.validated_transition ) ) )
864
          in
865
          let saved_transitions =
×
866
            Transition_cache.data transition_graph
×
867
            |> List.map ~f:(fun (x, _) ->
868
                   Transition_cache.header_with_hash @@ Envelope.Incoming.data x )
×
869
          in
870
          let module E = struct
×
871
            module T = struct
872
              type t = Mina_block.Header.t State_hash.With_state_hashes.t
×
873
              [@@deriving sexp]
874

875
              let compare = external_transition_compare ~context:(module Context)
876
            end
877

878
            include Comparable.Make (T)
879
          end in
880
          [%test_result: E.Set.t]
×
881
            (E.Set.of_list saved_transitions)
×
882
            ~expect:(E.Set.of_list expected_transitions) )
×
883

884
    let run_bootstrap ~timeout_duration ~my_net ~network_transition_pipe =
885
      let open Fake_network in
×
886
      let time_controller = Block_time.Controller.basic ~logger in
887
      let persistent_root =
888
        Transition_frontier.persistent_root my_net.state.frontier
889
      in
890
      let persistent_frontier =
×
891
        Transition_frontier.persistent_frontier my_net.state.frontier
892
      in
893
      let initial_root_transition =
×
894
        Transition_frontier.(
895
          Breadcrumb.validated_transition (root my_net.state.frontier))
×
896
      in
897
      let%bind () =
898
        Transition_frontier.close ~loc:__LOC__ my_net.state.frontier
×
899
      in
900
      [%log info] "bootstrap begin" ;
×
901
      Block_time.Timeout.await_exn time_controller ~timeout_duration
×
902
        (run
903
           ~context:(module Context)
904
           ~trust_system ~verifier ~network:my_net.network ~preferred_peers:[]
905
           ~consensus_local_state:my_net.state.consensus_local_state
906
           ~network_transition_pipe ~persistent_root ~persistent_frontier
907
           ~catchup_mode:`Super ~initial_root_transition )
908

909
    let assert_transitions_increasingly_sorted ~root
910
        (incoming_transitions : Transition_cache.element list) =
911
      let root =
×
912
        Transition_frontier.Breadcrumb.block root |> Mina_block.header
×
913
      in
914
      ignore
×
915
        ( List.fold_result ~init:root incoming_transitions
×
916
            ~f:(fun max_acc incoming_transition ->
917
              let With_hash.{ data = header; _ } =
×
918
                Transition_cache.header_with_hash
919
                  (Envelope.Incoming.data @@ fst incoming_transition)
×
920
              in
921
              let header_len h =
×
922
                Mina_block.Header.protocol_state h
×
923
                |> Protocol_state.consensus_state
×
924
                |> Consensus.Data.Consensus_state.blockchain_length
925
              in
926
              let open Result.Let_syntax in
927
              let%map () =
928
                Result.ok_if_true
×
929
                  Mina_numbers.Length.(header_len max_acc <= header_len header)
×
930
                  ~error:
931
                    (Error.of_string
×
932
                       "The blocks are not sorted in increasing order" )
933
              in
934
              header )
×
935
          |> Or_error.ok_exn
×
936
          : Mina_block.Header.t )
937

938
    let%test_unit "sync with one node after receiving a transition" =
939
      Quickcheck.test ~trials:1
×
940
        Fake_network.Generator.(
941
          gen ~precomputed_values ~verifier ~max_frontier_length
×
942
            ~ledger_sync_config
943
            [ fresh_peer
944
            ; peer_with_branch
945
                ~frontier_branch_size:((max_frontier_length * 2) + 2)
946
            ])
947
        ~f:(fun fake_network ->
948
          let [ my_net; peer_net ] = fake_network.peer_networks in
×
949
          let block =
950
            Envelope.Incoming.wrap
951
              ~data:
952
                ( Transition_frontier.best_tip peer_net.state.frontier
×
953
                |> Transition_frontier.Breadcrumb.validated_transition
×
954
                |> Mina_block.Validated.remember
×
955
                |> Mina_block.Validation.reset_frontier_dependencies_validation
×
956
                |> Mina_block.Validation.reset_staged_ledger_diff_validation )
×
957
              ~sender:(Envelope.Sender.Remote peer_net.peer)
958
          in
959
          let network_transition_pipe =
960
            Swappable.create ~name:(__MODULE__ ^ __LOC__)
961
              (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
962
          in
963
          Swappable.write network_transition_pipe (`Block block, `Valid_cb None) ;
×
964
          let new_frontier, sorted_external_transitions =
×
965
            Async.Thread_safe.block_on_async_exn (fun () ->
966
                run_bootstrap
×
967
                  ~timeout_duration:(Block_time.Span.of_ms 30_000L)
×
968
                  ~my_net ~network_transition_pipe )
969
          in
970
          assert_transitions_increasingly_sorted
×
971
            ~root:(Transition_frontier.root new_frontier)
×
972
            sorted_external_transitions ;
973
          [%test_result: Ledger_hash.t]
×
974
            ( Ledger.Root.merkle_root
×
975
            @@ Transition_frontier.root_snarked_ledger new_frontier )
×
976
            ~expect:
977
              ( Ledger.Root.merkle_root
×
978
              @@ Transition_frontier.root_snarked_ledger peer_net.state.frontier
×
979
              ) )
980

981
    let%test_unit "reconstruct staged_ledgers using \
982
                   of_scan_state_and_snarked_ledger" =
983
      Quickcheck.test ~trials:1
×
984
        (Transition_frontier.For_tests.gen ~precomputed_values ~verifier
×
985
           ~max_length:max_frontier_length ~size:max_frontier_length () )
986
        ~f:(fun frontier ->
987
          Thread_safe.block_on_async_exn
×
988
          @@ fun () ->
989
          Deferred.List.iter (Transition_frontier.all_breadcrumbs frontier)
×
990
            ~f:(fun breadcrumb ->
991
              let staged_ledger =
×
992
                Transition_frontier.Breadcrumb.staged_ledger breadcrumb
993
              in
994
              let expected_merkle_root =
×
995
                Staged_ledger.ledger staged_ledger |> Ledger.merkle_root
×
996
              in
997
              let snarked_ledger =
×
998
                Transition_frontier.root_snarked_ledger frontier
×
999
                |> Ledger.Root.as_masked
1000
              in
1001
              let snarked_local_state =
×
1002
                Transition_frontier.root frontier
×
1003
                |> Transition_frontier.Breadcrumb.protocol_state
×
1004
                |> Protocol_state.blockchain_state
×
1005
                |> Blockchain_state.snarked_local_state
1006
              in
1007
              let scan_state = Staged_ledger.scan_state staged_ledger in
×
1008
              let get_state hash =
×
1009
                match Transition_frontier.find_protocol_state frontier hash with
×
1010
                | Some protocol_state ->
×
1011
                    Ok protocol_state
1012
                | None ->
×
1013
                    Or_error.errorf
1014
                      !"Protocol state (for scan state transactions) for \
×
1015
                        %{sexp:State_hash.t} not found"
1016
                      hash
1017
              in
1018
              let pending_coinbases =
1019
                Staged_ledger.pending_coinbase_collection staged_ledger
1020
              in
1021
              let%map actual_staged_ledger =
1022
                Staged_ledger.of_scan_state_pending_coinbases_and_snarked_ledger
1023
                  ~scan_state ~logger ~verifier ~constraint_constants
1024
                  ~snarked_ledger ~snarked_local_state ~expected_merkle_root
1025
                  ~pending_coinbases ~get_state
1026
                |> Deferred.Or_error.ok_exn
×
1027
              in
1028
              let height =
×
1029
                Transition_frontier.Breadcrumb.consensus_state breadcrumb
×
1030
                |> Consensus.Data.Consensus_state.blockchain_length
×
1031
                |> Mina_numbers.Length.to_int
1032
              in
1033
              [%test_eq: Staged_ledger_hash.t]
×
1034
                ~message:
1035
                  (sprintf "mismatch of staged ledger hash height %d" height)
×
1036
                (Transition_frontier.Breadcrumb.staged_ledger_hash breadcrumb)
×
1037
                (Staged_ledger.hash actual_staged_ledger) ) )
×
1038

1039
    (*
1040
    let%test_unit "if we see a new transition that is better than the \
1041
                   transition that we are syncing from, than we should \
1042
                   retarget our root" =
1043
      Quickcheck.test ~trials:1
1044
        Fake_network.Generator.(
1045
          gen ~max_frontier_length
1046
            [ fresh_peer
1047
            ; peer_with_branch ~frontier_branch_size:max_frontier_length
1048
            ; peer_with_branch
1049
                ~frontier_branch_size:((max_frontier_length * 2) + 2) ])
1050
        ~f:(fun fake_network ->
1051
          let [me; weaker_chain; stronger_chain] =
1052
            fake_network.peer_networks
1053
          in
1054
          let transition_reader, transition_writer =
1055
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
1056
              (Buffered (`Capacity 10, `Overflow Drop_head))
1057
          in
1058
          Envelope.Incoming.wrap
1059
            ~data:
1060
              ( Transition_frontier.best_tip weaker_chain.state.frontier
1061
              |> Transition_frontier.Breadcrumb.validated_transition
1062
              |> Mina_block.Validated.to_initial_validated )
1063
            ~sender:
1064
              (Envelope.Sender.Remote
1065
                 (weaker_chain.peer.host, weaker_chain.peer.peer_id))
1066
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1067
          Envelope.Incoming.wrap
1068
            ~data:
1069
              ( Transition_frontier.best_tip stronger_chain.state.frontier
1070
              |> Transition_frontier.Breadcrumb.validated_transition
1071
              |> Mina_block.Validated.to_initial_validated )
1072
            ~sender:
1073
              (Envelope.Sender.Remote
1074
                 (stronger_chain.peer.host, stronger_chain.peer.peer_id))
1075
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1076
          let new_frontier, sorted_external_transitions =
1077
            Async.Thread_safe.block_on_async_exn (fun () ->
1078
                run_bootstrap
1079
                  ~timeout_duration:(Block_time.Span.of_ms 60_000L)
1080
                  ~my_net:me ~transition_reader )
1081
          in
1082
          assert_transitions_increasingly_sorted
1083
            ~root:(Transition_frontier.root new_frontier)
1084
            sorted_external_transitions ;
1085
          [%test_result: Ledger_hash.t]
1086
            ( Ledger.Db.merkle_root
1087
            @@ Transition_frontier.root_snarked_ledger new_frontier )
1088
            ~expect:
1089
              ( Ledger.Db.merkle_root
1090
              @@ Transition_frontier.root_snarked_ledger
1091
                   stronger_chain.state.frontier ) )
1092
*)
1093
  end )
88✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc