• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 581

11 Sep 2025 07:15PM UTC coverage: 32.077% (-2.2%) from 34.248%
581

push

buildkite

web-flow
Merge pull request #17778 from MinaProtocol/dkijania/publish_mina_logproc

[CI] Publish logproc in nightly

23185 of 72279 relevant lines covered (32.08%)

24836.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.56
/src/lib/bootstrap_controller/bootstrap_controller.ml
1
(* Only show stdout for failed inline tests. *)
44✔
2
open Core
3
open Async
4
open Mina_base
5
module Ledger = Mina_ledger.Ledger
6
module Sync_ledger = Mina_ledger.Sync_ledger
7
open Mina_state
8
open Pipe_lib.Strict_pipe
9
open Network_peer
10
module Transition_cache = Transition_cache
11

12
module type CONTEXT = sig
13
  val logger : Logger.t
14

15
  val precomputed_values : Precomputed_values.t
16

17
  val constraint_constants : Genesis_constants.Constraint_constants.t
18

19
  val consensus_constants : Consensus.Constants.t
20

21
  val ledger_sync_config : Syncable_ledger.daemon_config
22

23
  val proof_cache_db : Proof_cache_tag.cache_db
24
end
25

26
type Structured_log_events.t += Bootstrap_complete
27
  [@@deriving register_event { msg = "Bootstrap state: complete." }]
×
28

29
type t =
30
  { context : (module CONTEXT)
31
  ; trust_system : Trust_system.t
32
  ; verifier : Verifier.t
33
  ; mutable best_seen_transition : Mina_block.initial_valid_block
34
  ; mutable current_root : Mina_block.initial_valid_block
35
  ; network : Mina_networking.t
36
  ; mutable num_of_root_snarked_ledger_retargeted : int
37
  }
38

39
type time = Time.Span.t
40

41
let time_to_yojson span =
42
  `String (Printf.sprintf "%f seconds" (Time.Span.to_sec span))
×
43

44
type opt_time = time option
45

46
let opt_time_to_yojson = function
47
  | Some time ->
×
48
      time_to_yojson time
49
  | None ->
×
50
      `Null
51

52
(** An auxiliary data structure for collecting various metrics for bootstrap controller. *)
53
type bootstrap_cycle_stats =
×
54
  { cycle_result : string
×
55
  ; sync_ledger_time : time
×
56
  ; staged_ledger_data_download_time : time
×
57
  ; staged_ledger_construction_time : opt_time
×
58
  ; local_state_sync_required : bool
×
59
  ; local_state_sync_time : opt_time
×
60
  }
61
[@@deriving to_yojson]
62

63
let time_deferred deferred =
64
  let start_time = Time.now () in
×
65
  let%map result = deferred in
66
  let end_time = Time.now () in
×
67
  (Time.diff end_time start_time, result)
×
68

69
let worth_getting_root ({ context = (module Context); _ } as t) candidate =
70
  let module Consensus_context = struct
×
71
    include Context
72

73
    let logger =
74
      Logger.extend logger
×
75
        [ ( "selection_context"
76
          , `String "Bootstrap_controller.worth_getting_root" )
77
        ]
78
  end in
79
  Consensus.Hooks.equal_select_status `Take
80
  @@ Consensus.Hooks.select
81
       ~context:(module Consensus_context)
82
       ~existing:
83
         ( t.best_seen_transition |> Mina_block.Validation.block_with_hash
×
84
         |> With_hash.map ~f:Mina_block.consensus_state )
×
85
       ~candidate
86

87
let received_bad_proof ({ context = (module Context); _ } as t) host e =
88
  let open Context in
×
89
  Trust_system.(
90
    record t.trust_system logger host
91
      Actions.
92
        ( Violated_protocol
93
        , Some
94
            ( "Bad ancestor proof: $error"
95
            , [ ("error", Error_json.error_to_yojson e) ] ) ))
×
96

97
let done_syncing_root root_sync_ledger =
98
  Option.is_some (Sync_ledger.Root.peek_valid_tree root_sync_ledger)
×
99

100
let should_sync ~root_sync_ledger t candidate_state =
101
  (not @@ done_syncing_root root_sync_ledger)
×
102
  && worth_getting_root t candidate_state
×
103

104
(** Update [Synced_ledger]'s target and [best_seen_transition] and [current_root] accordingly. *)
105
let start_sync_job_with_peer ~sender ~root_sync_ledger
106
    ({ context = (module Context); _ } as t) peer_best_tip peer_root =
107
  let open Context in
×
108
  let%bind () =
109
    Trust_system.(
110
      record t.trust_system logger sender
×
111
        Actions.
112
          ( Fulfilled_request
113
          , Some ("Received verified peer root and best tip", []) ))
114
  in
115
  t.best_seen_transition <- peer_best_tip ;
×
116
  t.current_root <- peer_root ;
117
  let blockchain_state =
118
    t.current_root |> Mina_block.Validation.block |> Mina_block.header
×
119
    |> Mina_block.Header.protocol_state |> Protocol_state.blockchain_state
×
120
  in
121
  let expected_staged_ledger_hash =
×
122
    blockchain_state |> Blockchain_state.staged_ledger_hash
123
  in
124
  let snarked_ledger_hash =
×
125
    blockchain_state |> Blockchain_state.snarked_ledger_hash
126
  in
127
  return
×
128
  @@
129
  match
130
    Sync_ledger.Root.new_goal root_sync_ledger
131
      (Frozen_ledger_hash.to_ledger_hash snarked_ledger_hash)
×
132
      ~data:
133
        ( State_hash.With_state_hashes.state_hash
×
134
          @@ Mina_block.Validation.block_with_hash t.current_root
×
135
        , sender
136
        , expected_staged_ledger_hash )
137
      ~equal:(fun (hash1, _, _) (hash2, _, _) -> State_hash.equal hash1 hash2)
×
138
  with
139
  | `New ->
×
140
      t.num_of_root_snarked_ledger_retargeted <-
141
        t.num_of_root_snarked_ledger_retargeted + 1 ;
142
      `Syncing_new_snarked_ledger
143
  | `Update_data ->
×
144
      `Updating_root_transition
145
  | `Repeat ->
×
146
      `Ignored
147

148
let to_consensus_state h =
149
  Mina_block.Header.protocol_state h |> Protocol_state.consensus_state
×
150

151
(** For each transition, this function would compare it with the existing one.
152
    If the incoming transition is better, then download the merkle list from
153
    that transition to its root and verify it. If we get a better root than
154
    the existing one, then reset the Sync_ledger's target by calling
155
    [start_sync_job_with_peer] function. *)
156
let on_transition ({ context = (module Context); _ } as t) ~sender
157
    ~root_sync_ledger candidate_header =
158
  let open Context in
×
159
  let candidate_consensus_state =
160
    With_hash.map ~f:to_consensus_state candidate_header
161
  in
162
  if not @@ should_sync ~root_sync_ledger t candidate_consensus_state then
×
163
    Deferred.return `Ignored
×
164
  else
165
    match%bind
166
      Mina_networking.get_ancestry t.network sender.Peer.peer_id
×
167
        (With_hash.map_hash candidate_consensus_state
×
168
           ~f:State_hash.State_hashes.state_hash )
169
    with
170
    | Error e ->
×
171
        [%log error]
×
172
          ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
173
          !"Could not get the proof of the root transition from the network: \
174
            $error" ;
175
        Deferred.return `Ignored
×
176
    | Ok peer_root_with_proof -> (
×
177
        let pcd =
178
          peer_root_with_proof.data
179
          |> Proof_carrying_data.map
×
180
               ~f:(Mina_block.write_all_proofs_to_disk ~proof_cache_db)
181
          |> Proof_carrying_data.map_proof
182
               ~f:
183
                 (Tuple2.map_snd
184
                    ~f:(Mina_block.write_all_proofs_to_disk ~proof_cache_db) )
185
        in
186
        match%bind
187
          Mina_block.verify_on_header
×
188
            ~verify:
189
              (Sync_handler.Root.verify
×
190
                 ~context:(module Context)
191
                 ~verifier:t.verifier candidate_consensus_state )
192
            pcd
193
        with
194
        | Ok (`Root root, `Best_tip best_tip) ->
×
195
            if done_syncing_root root_sync_ledger then return `Ignored
×
196
            else
197
              start_sync_job_with_peer ~sender ~root_sync_ledger t best_tip root
×
198
        | Error e ->
×
199
            return (received_bad_proof t sender e |> Fn.const `Ignored) )
×
200

201
(** A helper function that wraps the calls to Sync_ledger and iterate through
202
    incoming transitions, add those to the transition_cache and calls
203
    [on_transition] function. *)
204
let sync_ledger ({ context = (module Context); _ } as t) ~preferred
205
    ~root_sync_ledger ~transition_graph ~sync_ledger_reader =
206
  let open Context in
×
207
  let query_reader = Sync_ledger.Root.query_reader root_sync_ledger in
208
  let response_writer = Sync_ledger.Root.answer_writer root_sync_ledger in
×
209
  Mina_networking.glue_sync_ledger ~preferred t.network query_reader
×
210
    response_writer ;
211
  Pipe_lib.Choosable_synchronous_pipe.iter sync_ledger_reader
×
212
    ~f:(fun (b_or_h, `Valid_cb vc) ->
213
      let header_with_hash, sender, transition_cache_element =
×
214
        match b_or_h with
215
        | `Block b_env ->
×
216
            ( Envelope.Incoming.data b_env
×
217
              |> Mina_block.Validation.block_with_hash
×
218
              |> With_hash.map ~f:Mina_block.header
×
219
            , Envelope.Incoming.remote_sender_exn b_env
×
220
            , Envelope.Incoming.map ~f:(fun x -> `Block x) b_env )
×
221
        | `Header h_env ->
×
222
            ( Envelope.Incoming.data h_env
×
223
              |> Mina_block.Validation.header_with_hash
×
224
            , Envelope.Incoming.remote_sender_exn h_env
×
225
            , Envelope.Incoming.map ~f:(fun x -> `Header x) h_env )
×
226
      in
227
      let previous_state_hash =
228
        With_hash.data header_with_hash
×
229
        |> Mina_block.Header.protocol_state
×
230
        |> Protocol_state.previous_state_hash
231
      in
232
      Transition_cache.add transition_graph ~parent:previous_state_hash
×
233
        (transition_cache_element, vc) ;
234
      (* TODO: Efficiently limiting the number of green threads in #1337 *)
235
      if
×
236
        worth_getting_root t
237
          (With_hash.map ~f:to_consensus_state header_with_hash)
×
238
      then (
×
239
        [%log trace] "Added the transition from sync_ledger_reader into cache"
×
240
          ~metadata:
241
            [ ( "state_hash"
242
              , State_hash.to_yojson
×
243
                  (State_hash.With_state_hashes.state_hash header_with_hash) )
×
244
            ; ( "header"
245
              , Mina_block.Header.to_yojson (With_hash.data header_with_hash) )
×
246
            ] ;
247

248
        Deferred.ignore_m
×
249
        @@ on_transition t ~sender ~root_sync_ledger header_with_hash )
×
250
      else Deferred.unit )
×
251

252
let external_transition_compare ~context:(module Context : CONTEXT) =
253
  let get_consensus_state =
×
254
    Fn.compose Protocol_state.consensus_state Mina_block.Header.protocol_state
255
  in
256
  Comparable.lift
×
257
    (fun existing candidate ->
258
      (* To prevent the logger to spam a lot of messages, the logger input is set to null *)
259
      if
×
260
        State_hash.equal
261
          (State_hash.With_state_hashes.state_hash existing)
×
262
          (State_hash.With_state_hashes.state_hash candidate)
×
263
      then 0
×
264
      else if
×
265
        Consensus.Hooks.equal_select_status `Keep
266
        @@ Consensus.Hooks.select ~context:(module Context) ~existing ~candidate
267
      then -1
×
268
      else 1 )
×
269
    ~f:(With_hash.map ~f:get_consensus_state)
270

271
let download_snarked_ledger ~trust_system ~preferred_peers ~transition_graph
272
    ~sync_ledger_reader ~context t temp_snarked_ledger =
273
  time_deferred
×
274
    (let root_sync_ledger =
275
       Sync_ledger.Root.create temp_snarked_ledger ~context ~trust_system
276
     in
277
     don't_wait_for
×
278
       (sync_ledger t ~preferred:preferred_peers ~root_sync_ledger
×
279
          ~transition_graph ~sync_ledger_reader ) ;
280
     (* We ignore the resulting ledger returned here since it will always
281
        * be the same as the ledger we started with because we are syncing
282
        * a db ledger. *)
283
     let%map _, data = Sync_ledger.Root.valid_tree root_sync_ledger in
×
284
     Sync_ledger.Root.destroy root_sync_ledger ;
×
285
     data )
×
286

287
(** Run one bootstrap cycle *)
288
let run_cycle ~context:(module Context : CONTEXT) ~trust_system ~verifier
289
    ~network ~consensus_local_state ~network_transition_pipe ~preferred_peers
290
    ~persistent_root ~persistent_frontier ~initial_root_transition ~catchup_mode
291
    previous_cycles =
292
  let open Context in
×
293
  (* The short-lived pipe allocated here will be closed
294
     when a follow-up pipe is allocated: in the next cycle of bootstrap
295
     or when controll is passed to the transition frontier controller.staged_ledger_construction_time
296
     Because [Choosable_synchronous_pipe.t] is used, no data will be lost: any
297
     successful read is followed by mom-blocking handling, and if the read/write
298
     pair is not consumed, it will continue in a follow-up pipe allocated in the
299
     next call to [Swappable.swap_reader].
300
  *)
301
  let%bind sync_ledger_reader = Swappable.swap_reader network_transition_pipe in
×
302
  let initial_root_transition =
×
303
    initial_root_transition |> Mina_block.Validated.remember
×
304
    |> Mina_block.Validation.reset_frontier_dependencies_validation
×
305
    |> Mina_block.Validation.reset_staged_ledger_diff_validation
306
  in
307
  let t =
×
308
    { network
309
    ; context = (module Context)
310
    ; trust_system
311
    ; verifier
312
    ; best_seen_transition = initial_root_transition
313
    ; current_root = initial_root_transition
314
    ; num_of_root_snarked_ledger_retargeted = 0
315
    }
316
  in
317
  let transition_graph = Transition_cache.create () in
318
  let temp_persistent_root_instance =
×
319
    Transition_frontier.Persistent_root.create_instance_exn persistent_root
320
  in
321
  let temp_snarked_ledger =
×
322
    Transition_frontier.Persistent_root.Instance.snarked_ledger
323
      temp_persistent_root_instance
324
  in
325
  (* step 1. download snarked_ledger *)
326
  let%bind sync_ledger_time, (hash, sender, expected_staged_ledger_hash) =
327
    download_snarked_ledger
×
328
      ~context:(module Context)
329
      ~trust_system ~preferred_peers ~transition_graph ~sync_ledger_reader t
330
      temp_snarked_ledger
331
  in
332
  Mina_metrics.(
×
333
    Counter.inc Bootstrap.root_snarked_ledger_sync_ms
×
334
      Time.Span.(to_ms sync_ledger_time)) ;
×
335
  Mina_metrics.(
336
    Gauge.set Bootstrap.num_of_root_snarked_ledger_retargeted
×
337
      (Float.of_int t.num_of_root_snarked_ledger_retargeted)) ;
×
338
  (* step 2. Download scan state and pending coinbases. *)
339
  let%bind ( staged_ledger_data_download_time
340
           , staged_ledger_construction_time
341
           , staged_ledger_aux_result ) =
342
    let%bind ( staged_ledger_data_download_time
343
             , staged_ledger_data_download_result ) =
344
      time_deferred
×
345
        (Mina_networking.get_staged_ledger_aux_and_pending_coinbases_at_hash
×
346
           t.network sender.peer_id hash )
347
    in
348
    match staged_ledger_data_download_result with
×
349
    | Error err ->
×
350
        Deferred.return (staged_ledger_data_download_time, None, Error err)
351
    | Ok
×
352
        ( scan_state_uncached
353
        , expected_merkle_root
354
        , pending_coinbases
355
        , protocol_states ) -> (
356
        let%map staged_ledger_construction_result =
357
          O1trace.thread "construct_root_staged_ledger" (fun () ->
×
358
              let open Deferred.Or_error.Let_syntax in
×
359
              let received_staged_ledger_hash =
360
                Staged_ledger_hash.of_aux_ledger_and_coinbase_hash
361
                  (Staged_ledger.Scan_state.Stable.Latest.hash
×
362
                     scan_state_uncached )
363
                  expected_merkle_root pending_coinbases
364
              in
365
              [%log debug]
×
366
                ~metadata:
367
                  [ ( "expected_staged_ledger_hash"
368
                    , Staged_ledger_hash.to_yojson expected_staged_ledger_hash
×
369
                    )
370
                  ; ( "received_staged_ledger_hash"
371
                    , Staged_ledger_hash.to_yojson received_staged_ledger_hash
×
372
                    )
373
                  ]
374
                "Comparing $expected_staged_ledger_hash to \
375
                 $received_staged_ledger_hash" ;
376
              let%bind new_root =
377
                t.current_root
378
                |> Mina_block.Validation.skip_frontier_dependencies_validation
×
379
                     `This_block_belongs_to_a_detached_subtree
380
                |> Mina_block.Validation.validate_staged_ledger_hash
×
381
                     (`Staged_ledger_already_materialized
382
                       received_staged_ledger_hash )
383
                |> Result.map_error ~f:(fun _ ->
×
384
                       Error.of_string "received faulty scan state from peer" )
×
385
                |> Deferred.return
×
386
              in
387
              let protocol_states =
×
388
                List.map protocol_states
389
                  ~f:(With_hash.of_data ~hash_data:Protocol_state.hashes)
390
              in
391
              let scan_state =
×
392
                Staged_ledger.Scan_state.write_all_proofs_to_disk
393
                  ~proof_cache_db scan_state_uncached
394
              in
395
              let%bind protocol_states =
396
                Staged_ledger.Scan_state.check_required_protocol_states
×
397
                  scan_state ~protocol_states
398
                |> Deferred.return
×
399
              in
400
              let protocol_states_map =
×
401
                protocol_states
402
                |> List.map ~f:(fun ps ->
×
403
                       (State_hash.With_state_hashes.state_hash ps, ps) )
×
404
                |> State_hash.Map.of_alist_exn
405
              in
406
              let get_state hash =
×
407
                match Map.find protocol_states_map hash with
×
408
                | None ->
×
409
                    let new_state_hash =
410
                      State_hash.With_state_hashes.state_hash (fst new_root)
×
411
                    in
412
                    [%log error]
×
413
                      ~metadata:
414
                        [ ("new_root", State_hash.to_yojson new_state_hash)
×
415
                        ; ("state_hash", State_hash.to_yojson hash)
×
416
                        ]
417
                      "Protocol state (for scan state transactions) for \
418
                       $state_hash not found when bootstrapping to the new \
419
                       root $new_root" ;
420
                    Or_error.errorf
×
421
                      !"Protocol state (for scan state transactions) for \
×
422
                        %{sexp:State_hash.t} not found when bootstrapping to \
423
                        the new root %{sexp:State_hash.t}"
424
                      hash new_state_hash
425
                | Some protocol_state ->
×
426
                    Ok (With_hash.data protocol_state)
×
427
              in
428
              (* step 3. Construct staged ledger from snarked ledger, scan state
429
                 and pending coinbases. *)
430
              (* Construct the staged ledger before constructing the transition
431
               * frontier in order to verify the scan state we received.
432
               * TODO: reorganize the code to avoid doing this twice (#3480) *)
433
              let open Deferred.Let_syntax in
434
              let%map staged_ledger_construction_time, construction_result =
435
                time_deferred
×
436
                  (let open Deferred.Let_syntax in
437
                  let temp_mask = Ledger.Root.as_masked temp_snarked_ledger in
438
                  let%map result =
439
                    Staged_ledger
440
                    .of_scan_state_pending_coinbases_and_snarked_ledger ~logger
441
                      ~snarked_local_state:
442
                        Mina_block.(
443
                          t.current_root |> Validation.block |> header
×
444
                          |> Header.protocol_state
×
445
                          |> Protocol_state.blockchain_state
×
446
                          |> Blockchain_state.snarked_local_state)
×
447
                      ~verifier ~constraint_constants ~scan_state
448
                      ~snarked_ledger:temp_mask ~expected_merkle_root
449
                      ~pending_coinbases ~get_state
450
                      ~signature_kind:Mina_signature_kind.t_DEPRECATED
451
                  in
452
                  ignore
×
453
                    ( Ledger.Maskable.unregister_mask_exn ~loc:__LOC__ temp_mask
×
454
                      : Ledger.unattached_mask ) ;
455
                  Result.map result
456
                    ~f:
457
                      (const
×
458
                         ( scan_state
459
                         , pending_coinbases
460
                         , new_root
461
                         , protocol_states ) ))
462
              in
463
              Ok (staged_ledger_construction_time, construction_result) )
×
464
        in
465
        match staged_ledger_construction_result with
×
466
        | Error err ->
×
467
            (staged_ledger_data_download_time, None, Error err)
468
        | Ok (staged_ledger_construction_time, result) ->
×
469
            ( staged_ledger_data_download_time
470
            , Some staged_ledger_construction_time
471
            , result ) )
472
  in
473
  Transition_frontier.Persistent_root.Instance.close
×
474
    temp_persistent_root_instance ;
475
  match staged_ledger_aux_result with
×
476
  | Error e ->
×
477
      let%map () =
478
        Trust_system.(
479
          record t.trust_system logger sender
×
480
            Actions.
481
              ( Outgoing_connection_error
482
              , Some
483
                  ( "Can't find scan state from the peer or received faulty \
484
                     scan state from the peer."
485
                  , [] ) ))
486
      in
487
      [%log error]
×
488
        ~metadata:
489
          [ ("error", Error_json.error_to_yojson e)
×
490
          ; ("state_hash", State_hash.to_yojson hash)
×
491
          ; ( "expected_staged_ledger_hash"
492
            , Staged_ledger_hash.to_yojson expected_staged_ledger_hash )
×
493
          ]
494
        "Failed to find scan state for the transition with hash $state_hash \
495
         from the peer or received faulty scan state: $error. Retry bootstrap" ;
496
      let this_cycle =
×
497
        { cycle_result = "failed to download and construct scan state"
498
        ; sync_ledger_time
499
        ; staged_ledger_data_download_time
500
        ; staged_ledger_construction_time
501
        ; local_state_sync_required = false
502
        ; local_state_sync_time = None
503
        }
504
      in
505
      `Repeat (this_cycle :: previous_cycles)
506
  | Ok (scan_state, pending_coinbase, new_root, protocol_states) -> (
×
507
      let%bind () =
508
        Trust_system.(
509
          record t.trust_system logger sender
×
510
            Actions.
511
              ( Fulfilled_request
512
              , Some ("Received valid scan state from peer", []) ))
513
      in
514
      let best_seen_block_with_hash, _ = t.best_seen_transition in
×
515
      let consensus_state =
516
        With_hash.data best_seen_block_with_hash
×
517
        |> Mina_block.header |> Mina_block.Header.protocol_state
×
518
        |> Protocol_state.consensus_state
519
      in
520
      (* step 4. Synchronize consensus local state if necessary *)
521
      let%bind ( local_state_sync_time
522
               , (local_state_sync_required, local_state_sync_result) ) =
523
        time_deferred
×
524
          ( match
525
              Consensus.Hooks.required_local_state_sync
526
                ~constants:precomputed_values.consensus_constants
527
                ~consensus_state ~local_state:consensus_local_state
528
            with
529
          | None ->
×
530
              [%log debug]
×
531
                ~metadata:
532
                  [ ( "local_state"
533
                    , Consensus.Data.Local_state.to_yojson consensus_local_state
×
534
                    )
535
                  ; ( "consensus_state"
536
                    , Consensus.Data.Consensus_state.Value.to_yojson
×
537
                        consensus_state )
538
                  ]
539
                "Not synchronizing consensus local state" ;
540
              Deferred.return (false, Or_error.return ())
×
541
          | Some sync_jobs ->
×
542
              [%log info] "Synchronizing consensus local state" ;
×
543
              let%map result =
544
                Consensus.Hooks.sync_local_state
×
545
                  ~context:(module Context)
546
                  ~local_state:consensus_local_state ~trust_system
547
                  ~glue_sync_ledger:(Mina_networking.glue_sync_ledger t.network)
×
548
                  sync_jobs
549
              in
550
              (true, result) )
×
551
      in
552
      match local_state_sync_result with
×
553
      | Error e ->
×
554
          [%log error]
×
555
            ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
556
            "Local state sync failed: $error. Retry bootstrap" ;
557
          let this_cycle =
×
558
            { cycle_result = "failed to synchronize local state"
559
            ; sync_ledger_time
560
            ; staged_ledger_data_download_time
561
            ; staged_ledger_construction_time
562
            ; local_state_sync_required
563
            ; local_state_sync_time = Some local_state_sync_time
564
            }
565
          in
566
          Deferred.return (`Repeat (this_cycle :: previous_cycles))
567
      | Ok () ->
×
568
          (* step 5. Close the old frontier and reload a new one from disk. *)
569
          let new_root_data : Transition_frontier.Root_data.Limited.t =
570
            Transition_frontier.Root_data.Limited.create
571
              ~transition:(Mina_block.Validated.lift new_root)
×
572
              ~scan_state ~pending_coinbase ~protocol_states
573
          in
574
          let%bind () =
575
            Transition_frontier.Persistent_frontier.reset_database_exn
×
576
              persistent_frontier ~root_data:new_root_data
577
              ~genesis_state_hash:
578
                (State_hash.With_state_hashes.state_hash
×
579
                   precomputed_values.protocol_state_with_hashes )
580
          in
581
          (* TODO: lazy load db in persistent root to avoid unnecessary opens like this *)
582
          Transition_frontier.Persistent_root.(
×
583
            with_instance_exn persistent_root ~f:(fun instance ->
×
584
                Instance.set_root_state_hash instance
×
585
                @@ Mina_block.Validated.state_hash
×
586
                @@ Mina_block.Validated.lift new_root )) ;
×
587
          let%map new_frontier =
588
            let fail msg =
589
              failwith
×
590
                ( "failed to initialize transition frontier after \
591
                   bootstrapping: " ^ msg )
592
            in
593
            Transition_frontier.load
×
594
              ~context:(module Context)
595
              ~retry_with_fresh_db:false ~verifier ~consensus_local_state
596
              ~persistent_root ~persistent_frontier ~catchup_mode ()
597
            >>| function
×
598
            | Ok frontier ->
×
599
                frontier
600
            | Error (`Failure msg) ->
×
601
                fail msg
602
            | Error `Bootstrap_required ->
×
603
                fail
604
                  "bootstrap still required (indicates logical error in code)"
605
            | Error `Persistent_frontier_malformed ->
×
606
                fail "persistent frontier was malformed"
607
            | Error `Snarked_ledger_mismatch ->
×
608
                fail
609
                  "this should not happen, because we just reset the \
610
                   snarked_ledger"
611
          in
612
          [%str_log info] Bootstrap_complete ;
×
613
          let collected_transitions = Transition_cache.data transition_graph in
×
614
          let logger =
×
615
            Logger.extend logger
616
              [ ("context", `String "Filter collected transitions in bootstrap")
617
              ]
618
          in
619
          let root_consensus_state =
×
620
            Transition_frontier.(
621
              Breadcrumb.consensus_state_with_hashes (root new_frontier))
×
622
          in
623
          let filtered_collected_transitions =
624
            List.filter collected_transitions
625
              ~f:(fun (incoming_transition, _) ->
626
                let transition =
×
627
                  Envelope.Incoming.data incoming_transition
×
628
                  |> Transition_cache.header_with_hash
629
                in
630
                Consensus.Hooks.equal_select_status `Take
×
631
                @@ Consensus.Hooks.select
632
                     ~context:(module Context)
633
                     ~existing:root_consensus_state
634
                     ~candidate:
635
                       (With_hash.map
×
636
                          ~f:
637
                            (Fn.compose Protocol_state.consensus_state
×
638
                               Mina_block.Header.protocol_state )
639
                          transition ) )
640
          in
641
          [%log debug] "Sorting filtered transitions by consensus state"
×
642
            ~metadata:[] ;
643
          let sorted_filtered_collected_transitions =
×
644
            O1trace.sync_thread "sorting_collected_transitions" (fun () ->
645
                List.sort filtered_collected_transitions
×
646
                  ~compare:
647
                    (Comparable.lift
×
648
                       ~f:(fun (x, _) ->
649
                         Transition_cache.header_with_hash
×
650
                         @@ Envelope.Incoming.data x )
×
651
                       (external_transition_compare ~context:(module Context)) ) )
652
          in
653
          let this_cycle =
×
654
            { cycle_result = "success"
655
            ; sync_ledger_time
656
            ; staged_ledger_data_download_time
657
            ; staged_ledger_construction_time
658
            ; local_state_sync_required
659
            ; local_state_sync_time = Some local_state_sync_time
660
            }
661
          in
662
          `Finished
663
            ( this_cycle :: previous_cycles
664
            , (new_frontier, sorted_filtered_collected_transitions) ) )
665

666
(** The entry point function for bootstrap controller. When bootstrap finished
667
    it would return a transition frontier with the root breadcrumb and a list
668
    of transitions collected during bootstrap.
669

670
    Bootstrap controller would do the following steps to contrust the
671
    transition frontier:
672
    1. Download the root snarked_ledger.
673
    2. Download the scan state and pending coinbases.
674
    3. Construct the staged ledger from the snarked ledger, scan state and
675
       pending coinbases.
676
    4. Synchronize the consensus local state if necessary.
677
    5. Close the old frontier and reload a new one from disk.
678
 *)
679
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
680
    ~consensus_local_state ~network_transition_pipe ~preferred_peers
681
    ~persistent_root ~persistent_frontier ~initial_root_transition ~catchup_mode
682
    =
683
  let open Context in
×
684
  let run_cycle =
685
    run_cycle
686
      ~context:(module Context : CONTEXT)
687
      ~trust_system ~verifier ~network ~consensus_local_state
688
      ~network_transition_pipe ~preferred_peers ~persistent_root
689
      ~persistent_frontier ~initial_root_transition ~catchup_mode
690
  in
691
  O1trace.thread "bootstrap"
692
  @@ fun () ->
693
  let%map time_elapsed, (cycles, result) =
694
    time_deferred @@ Deferred.repeat_until_finished [] run_cycle
×
695
  in
696
  [%log info] "Bootstrap completed in $time_elapsed: $bootstrap_stats"
×
697
    ~metadata:
698
      [ ("time_elapsed", time_to_yojson time_elapsed)
×
699
      ; ( "bootstrap_stats"
700
        , `List (List.map ~f:bootstrap_cycle_stats_to_yojson cycles) )
×
701
      ] ;
702
  Mina_metrics.(
×
703
    Gauge.set Bootstrap.bootstrap_time_ms Core.Time.(Span.to_ms @@ time_elapsed)) ;
×
704
  result
705

706
let%test_module "Bootstrap_controller tests" =
707
  ( module struct
708
    let max_frontier_length =
709
      Transition_frontier.global_max_length Genesis_constants.For_unit_tests.t
×
710

711
    let logger = Logger.null ()
×
712

713
    let () =
714
      (* Disable log messages from best_tip_diff logger. *)
715
      Logger.Consumer_registry.register ~commit_id:""
×
716
        ~id:Logger.Logger_id.best_tip_diff ~processor:(Logger.Processor.raw ())
×
717
        ~transport:
718
          (Logger.Transport.create
×
719
             ( module struct
720
               type t = unit
721

722
               let transport () _ = ()
×
723
             end )
724
             () )
725
        ()
726

727
    let trust_system =
728
      let s = Trust_system.null () in
729
      don't_wait_for
×
730
        (Pipe_lib.Strict_pipe.Reader.iter
×
731
           (Trust_system.upcall_pipe s)
×
732
           ~f:(const Deferred.unit) ) ;
×
733
      s
×
734

735
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
736

737
    let proof_level = precomputed_values.proof_level
738

739
    let constraint_constants = precomputed_values.constraint_constants
740

741
    let ledger_sync_config =
742
      Syncable_ledger.create_config
×
743
        ~compile_config:Mina_compile_config.For_unit_tests.t
744
        ~max_subtree_depth:None ~default_subtree_depth:None ()
745

746
    module Context = struct
747
      let logger = logger
748

749
      let precomputed_values = precomputed_values
750

751
      let constraint_constants =
752
        Genesis_constants.For_unit_tests.Constraint_constants.t
753

754
      let consensus_constants = precomputed_values.consensus_constants
755

756
      let ledger_sync_config =
757
        Syncable_ledger.create_config
×
758
          ~compile_config:Mina_compile_config.For_unit_tests.t
759
          ~max_subtree_depth:None ~default_subtree_depth:None ()
760

761
      let proof_cache_db = Proof_cache_tag.For_tests.create_db ()
×
762
    end
763

764
    let verifier =
765
      Async.Thread_safe.block_on_async_exn (fun () ->
×
766
          Verifier.For_tests.default ~constraint_constants ~logger ~proof_level
×
767
            () )
768

769
    module Genesis_ledger = (val precomputed_values.genesis_ledger)
770

771
    let downcast_transition ~sender transition =
772
      let transition =
×
773
        transition |> Mina_block.Validated.remember
×
774
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
775
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
776
      in
777
      Envelope.Incoming.wrap ~data:transition
×
778
        ~sender:(Envelope.Sender.Remote sender)
779

780
    let downcast_breadcrumb ~sender breadcrumb =
781
      downcast_transition ~sender
×
782
        (Transition_frontier.Breadcrumb.validated_transition breadcrumb)
×
783

784
    let make_non_running_bootstrap ~genesis_root ~network =
785
      let transition =
×
786
        genesis_root
787
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
788
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
789
      in
790
      { context = (module Context)
×
791
      ; trust_system
792
      ; verifier
793
      ; best_seen_transition = transition
794
      ; current_root = transition
795
      ; network
796
      ; num_of_root_snarked_ledger_retargeted = 0
797
      }
798

799
    let%test_unit "Bootstrap controller caches all transitions it is passed \
800
                   through the transition_reader" =
801
      let branch_size = (max_frontier_length * 2) + 2 in
×
802
      Quickcheck.test ~trials:1
803
        (let open Quickcheck.Generator.Let_syntax in
804
        (* we only need one node for this test, but we need more than one peer so that mina_networking does not throw an error *)
805
        let%bind fake_network =
806
          Fake_network.Generator.(
807
            gen ~precomputed_values ~verifier ~max_frontier_length
×
808
              ~ledger_sync_config [ fresh_peer; fresh_peer ])
809
        in
810
        let%map make_branch =
811
          Transition_frontier.Breadcrumb.For_tests.gen_seq ~precomputed_values
×
812
            ~verifier
813
            ~accounts_with_secret_keys:(Lazy.force Genesis_ledger.accounts)
×
814
            branch_size
815
        in
816
        let [ me; _ ] = fake_network.peer_networks in
×
817
        let branch =
818
          Async.Thread_safe.block_on_async_exn (fun () ->
819
              make_branch (Transition_frontier.root me.state.frontier) )
×
820
        in
821
        (fake_network, branch))
×
822
        ~f:(fun (fake_network, branch) ->
823
          let [ me; other ] = fake_network.peer_networks in
×
824
          let genesis_root =
825
            Transition_frontier.(
826
              Breadcrumb.validated_transition @@ root me.state.frontier)
×
827
            |> Mina_block.Validated.remember
828
          in
829
          let transition_graph = Transition_cache.create () in
×
830
          let sync_ledger_reader, sync_ledger_writer =
×
831
            Pipe_lib.Choosable_synchronous_pipe.create ()
832
          in
833
          let bootstrap =
×
834
            make_non_running_bootstrap ~genesis_root ~network:me.network
835
          in
836
          let root_sync_ledger =
837
            Sync_ledger.Root.create
838
              (Transition_frontier.root_snarked_ledger me.state.frontier)
×
839
              ~context:(module Context)
840
              ~trust_system
841
          in
842
          Async.Thread_safe.block_on_async_exn (fun () ->
×
843
              let sync_deferred =
×
844
                sync_ledger bootstrap ~root_sync_ledger ~transition_graph
845
                  ~preferred:[] ~sync_ledger_reader
846
              in
847
              let%bind sync_ledger_writer' =
848
                Deferred.List.fold ~init:sync_ledger_writer branch
×
849
                  ~f:(fun sync_ledger_writer breadcrumb ->
850
                    Pipe_lib.Choosable_synchronous_pipe.write sync_ledger_writer
×
851
                      ( `Block
852
                          (downcast_breadcrumb ~sender:other.peer breadcrumb)
×
853
                      , `Valid_cb None ) )
854
              in
855
              Pipe_lib.Choosable_synchronous_pipe.close sync_ledger_writer' ;
×
856
              sync_deferred ) ;
×
857
          let expected_transitions =
×
858
            List.map branch
859
              ~f:
860
                (Fn.compose
×
861
                   (With_hash.map ~f:Mina_block.header)
862
                   (Fn.compose Mina_block.Validation.block_with_hash
×
863
                      (Fn.compose Mina_block.Validated.remember
×
864
                         Transition_frontier.Breadcrumb.validated_transition ) ) )
865
          in
866
          let saved_transitions =
×
867
            Transition_cache.data transition_graph
×
868
            |> List.map ~f:(fun (x, _) ->
869
                   Transition_cache.header_with_hash @@ Envelope.Incoming.data x )
×
870
          in
871
          let module E = struct
×
872
            module T = struct
873
              type t = Mina_block.Header.t State_hash.With_state_hashes.t
×
874
              [@@deriving sexp]
875

876
              let compare = external_transition_compare ~context:(module Context)
877
            end
878

879
            include Comparable.Make (T)
880
          end in
881
          [%test_result: E.Set.t]
×
882
            (E.Set.of_list saved_transitions)
×
883
            ~expect:(E.Set.of_list expected_transitions) )
×
884

885
    let run_bootstrap ~timeout_duration ~my_net ~network_transition_pipe =
886
      let open Fake_network in
×
887
      let time_controller = Block_time.Controller.basic ~logger in
888
      let persistent_root =
889
        Transition_frontier.persistent_root my_net.state.frontier
890
      in
891
      let persistent_frontier =
×
892
        Transition_frontier.persistent_frontier my_net.state.frontier
893
      in
894
      let initial_root_transition =
×
895
        Transition_frontier.(
896
          Breadcrumb.validated_transition (root my_net.state.frontier))
×
897
      in
898
      let%bind () =
899
        Transition_frontier.close ~loc:__LOC__ my_net.state.frontier
×
900
      in
901
      [%log info] "bootstrap begin" ;
×
902
      Block_time.Timeout.await_exn time_controller ~timeout_duration
×
903
        (run
904
           ~context:(module Context)
905
           ~trust_system ~verifier ~network:my_net.network ~preferred_peers:[]
906
           ~consensus_local_state:my_net.state.consensus_local_state
907
           ~network_transition_pipe ~persistent_root ~persistent_frontier
908
           ~catchup_mode:`Super ~initial_root_transition )
909

910
    let assert_transitions_increasingly_sorted ~root
911
        (incoming_transitions : Transition_cache.element list) =
912
      let root =
×
913
        Transition_frontier.Breadcrumb.block root |> Mina_block.header
×
914
      in
915
      ignore
×
916
        ( List.fold_result ~init:root incoming_transitions
×
917
            ~f:(fun max_acc incoming_transition ->
918
              let With_hash.{ data = header; _ } =
×
919
                Transition_cache.header_with_hash
920
                  (Envelope.Incoming.data @@ fst incoming_transition)
×
921
              in
922
              let header_len h =
×
923
                Mina_block.Header.protocol_state h
×
924
                |> Protocol_state.consensus_state
×
925
                |> Consensus.Data.Consensus_state.blockchain_length
926
              in
927
              let open Result.Let_syntax in
928
              let%map () =
929
                Result.ok_if_true
×
930
                  Mina_numbers.Length.(header_len max_acc <= header_len header)
×
931
                  ~error:
932
                    (Error.of_string
×
933
                       "The blocks are not sorted in increasing order" )
934
              in
935
              header )
×
936
          |> Or_error.ok_exn
×
937
          : Mina_block.Header.t )
938

939
    let%test_unit "sync with one node after receiving a transition" =
940
      Quickcheck.test ~trials:1
×
941
        Fake_network.Generator.(
942
          gen ~precomputed_values ~verifier ~max_frontier_length
×
943
            ~ledger_sync_config
944
            [ fresh_peer
945
            ; peer_with_branch
946
                ~frontier_branch_size:((max_frontier_length * 2) + 2)
947
            ])
948
        ~f:(fun fake_network ->
949
          let [ my_net; peer_net ] = fake_network.peer_networks in
×
950
          let block =
951
            Envelope.Incoming.wrap
952
              ~data:
953
                ( Transition_frontier.best_tip peer_net.state.frontier
×
954
                |> Transition_frontier.Breadcrumb.validated_transition
×
955
                |> Mina_block.Validated.remember
×
956
                |> Mina_block.Validation.reset_frontier_dependencies_validation
×
957
                |> Mina_block.Validation.reset_staged_ledger_diff_validation )
×
958
              ~sender:(Envelope.Sender.Remote peer_net.peer)
959
          in
960
          let network_transition_pipe =
961
            Swappable.create ~name:(__MODULE__ ^ __LOC__)
962
              (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
963
          in
964
          Swappable.write network_transition_pipe (`Block block, `Valid_cb None) ;
×
965
          let new_frontier, sorted_external_transitions =
×
966
            Async.Thread_safe.block_on_async_exn (fun () ->
967
                run_bootstrap
×
968
                  ~timeout_duration:(Block_time.Span.of_ms 30_000L)
×
969
                  ~my_net ~network_transition_pipe )
970
          in
971
          assert_transitions_increasingly_sorted
×
972
            ~root:(Transition_frontier.root new_frontier)
×
973
            sorted_external_transitions ;
974
          [%test_result: Ledger_hash.t]
×
975
            ( Ledger.Root.merkle_root
×
976
            @@ Transition_frontier.root_snarked_ledger new_frontier )
×
977
            ~expect:
978
              ( Ledger.Root.merkle_root
×
979
              @@ Transition_frontier.root_snarked_ledger peer_net.state.frontier
×
980
              ) )
981

982
    let%test_unit "reconstruct staged_ledgers using \
983
                   of_scan_state_and_snarked_ledger" =
984
      Quickcheck.test ~trials:1
×
985
        (Transition_frontier.For_tests.gen ~precomputed_values ~verifier
×
986
           ~max_length:max_frontier_length ~size:max_frontier_length () )
987
        ~f:(fun frontier ->
988
          Thread_safe.block_on_async_exn
×
989
          @@ fun () ->
990
          Deferred.List.iter (Transition_frontier.all_breadcrumbs frontier)
×
991
            ~f:(fun breadcrumb ->
992
              let staged_ledger =
×
993
                Transition_frontier.Breadcrumb.staged_ledger breadcrumb
994
              in
995
              let expected_merkle_root =
×
996
                Staged_ledger.ledger staged_ledger |> Ledger.merkle_root
×
997
              in
998
              let snarked_ledger =
×
999
                Transition_frontier.root_snarked_ledger frontier
×
1000
                |> Ledger.Root.as_masked
1001
              in
1002
              let snarked_local_state =
×
1003
                Transition_frontier.root frontier
×
1004
                |> Transition_frontier.Breadcrumb.protocol_state
×
1005
                |> Protocol_state.blockchain_state
×
1006
                |> Blockchain_state.snarked_local_state
1007
              in
1008
              let scan_state = Staged_ledger.scan_state staged_ledger in
×
1009
              let get_state hash =
×
1010
                match Transition_frontier.find_protocol_state frontier hash with
×
1011
                | Some protocol_state ->
×
1012
                    Ok protocol_state
1013
                | None ->
×
1014
                    Or_error.errorf
1015
                      !"Protocol state (for scan state transactions) for \
×
1016
                        %{sexp:State_hash.t} not found"
1017
                      hash
1018
              in
1019
              let pending_coinbases =
1020
                Staged_ledger.pending_coinbase_collection staged_ledger
1021
              in
1022
              let%map actual_staged_ledger =
1023
                Staged_ledger.of_scan_state_pending_coinbases_and_snarked_ledger
1024
                  ~scan_state ~logger ~verifier ~constraint_constants
1025
                  ~snarked_ledger ~snarked_local_state ~expected_merkle_root
1026
                  ~pending_coinbases ~get_state
1027
                  ~signature_kind:Mina_signature_kind.t_DEPRECATED
1028
                |> Deferred.Or_error.ok_exn
×
1029
              in
1030
              let height =
×
1031
                Transition_frontier.Breadcrumb.consensus_state breadcrumb
×
1032
                |> Consensus.Data.Consensus_state.blockchain_length
×
1033
                |> Mina_numbers.Length.to_int
1034
              in
1035
              [%test_eq: Staged_ledger_hash.t]
×
1036
                ~message:
1037
                  (sprintf "mismatch of staged ledger hash height %d" height)
×
1038
                (Transition_frontier.Breadcrumb.staged_ledger_hash breadcrumb)
×
1039
                (Staged_ledger.hash actual_staged_ledger) ) )
×
1040

1041
    (*
1042
    let%test_unit "if we see a new transition that is better than the \
1043
                   transition that we are syncing from, than we should \
1044
                   retarget our root" =
1045
      Quickcheck.test ~trials:1
1046
        Fake_network.Generator.(
1047
          gen ~max_frontier_length
1048
            [ fresh_peer
1049
            ; peer_with_branch ~frontier_branch_size:max_frontier_length
1050
            ; peer_with_branch
1051
                ~frontier_branch_size:((max_frontier_length * 2) + 2) ])
1052
        ~f:(fun fake_network ->
1053
          let [me; weaker_chain; stronger_chain] =
1054
            fake_network.peer_networks
1055
          in
1056
          let transition_reader, transition_writer =
1057
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
1058
              (Buffered (`Capacity 10, `Overflow Drop_head))
1059
          in
1060
          Envelope.Incoming.wrap
1061
            ~data:
1062
              ( Transition_frontier.best_tip weaker_chain.state.frontier
1063
              |> Transition_frontier.Breadcrumb.validated_transition
1064
              |> Mina_block.Validated.to_initial_validated )
1065
            ~sender:
1066
              (Envelope.Sender.Remote
1067
                 (weaker_chain.peer.host, weaker_chain.peer.peer_id))
1068
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1069
          Envelope.Incoming.wrap
1070
            ~data:
1071
              ( Transition_frontier.best_tip stronger_chain.state.frontier
1072
              |> Transition_frontier.Breadcrumb.validated_transition
1073
              |> Mina_block.Validated.to_initial_validated )
1074
            ~sender:
1075
              (Envelope.Sender.Remote
1076
                 (stronger_chain.peer.host, stronger_chain.peer.peer_id))
1077
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1078
          let new_frontier, sorted_external_transitions =
1079
            Async.Thread_safe.block_on_async_exn (fun () ->
1080
                run_bootstrap
1081
                  ~timeout_duration:(Block_time.Span.of_ms 60_000L)
1082
                  ~my_net:me ~transition_reader )
1083
          in
1084
          assert_transitions_increasingly_sorted
1085
            ~root:(Transition_frontier.root new_frontier)
1086
            sorted_external_transitions ;
1087
          [%test_result: Ledger_hash.t]
1088
            ( Ledger.Db.merkle_root
1089
            @@ Transition_frontier.root_snarked_ledger new_frontier )
1090
            ~expect:
1091
              ( Ledger.Db.merkle_root
1092
              @@ Transition_frontier.root_snarked_ledger
1093
                   stronger_chain.state.frontier ) )
1094
*)
1095
  end )
88✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc