• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 434

29 Jul 2025 03:33PM UTC coverage: 31.192% (-2.1%) from 33.242%
434

push

buildkite

web-flow
Merge pull request #17590 from MinaProtocol/georgeee/merge-compatible-to-develop-29jul2025

Merge compatible to develop (29 Jul 2025)

80 of 143 new or added lines in 20 files covered. (55.94%)

656 existing lines in 26 files now uncovered.

3341 of 10711 relevant lines covered (31.19%)

23853.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.55
/src/lib/bootstrap_controller/bootstrap_controller.ml
1
(* Only show stdout for failed inline tests. *)
44✔
2
open Core
3
open Async
4
open Mina_base
5
module Ledger = Mina_ledger.Ledger
6
module Sync_ledger = Mina_ledger.Sync_ledger
7
open Mina_state
8
open Pipe_lib.Strict_pipe
9
open Network_peer
10
module Transition_cache = Transition_cache
11

12
module type CONTEXT = sig
13
  val logger : Logger.t
14

15
  val precomputed_values : Precomputed_values.t
16

17
  val constraint_constants : Genesis_constants.Constraint_constants.t
18

19
  val consensus_constants : Consensus.Constants.t
20

21
  val ledger_sync_config : Syncable_ledger.daemon_config
22

23
  val proof_cache_db : Proof_cache_tag.cache_db
24
end
25

26
type Structured_log_events.t += Bootstrap_complete
27
  [@@deriving register_event { msg = "Bootstrap state: complete." }]
×
28

29
type t =
30
  { context : (module CONTEXT)
31
  ; trust_system : Trust_system.t
32
  ; verifier : Verifier.t
33
  ; mutable best_seen_transition : Mina_block.initial_valid_block
34
  ; mutable current_root : Mina_block.initial_valid_block
35
  ; network : Mina_networking.t
36
  ; mutable num_of_root_snarked_ledger_retargeted : int
37
  }
38

39
type time = Time.Span.t
40

41
let time_to_yojson span =
42
  `String (Printf.sprintf "%f seconds" (Time.Span.to_sec span))
×
43

44
type opt_time = time option
45

46
let opt_time_to_yojson = function
47
  | Some time ->
×
48
      time_to_yojson time
49
  | None ->
×
50
      `Null
51

52
(** An auxiliary data structure for collecting various metrics for bootstrap controller. *)
53
type bootstrap_cycle_stats =
×
54
  { cycle_result : string
×
55
  ; sync_ledger_time : time
×
56
  ; staged_ledger_data_download_time : time
×
57
  ; staged_ledger_construction_time : opt_time
×
58
  ; local_state_sync_required : bool
×
59
  ; local_state_sync_time : opt_time
×
60
  }
61
[@@deriving to_yojson]
62

63
let time_deferred deferred =
64
  let start_time = Time.now () in
×
65
  let%map result = deferred in
66
  let end_time = Time.now () in
×
67
  (Time.diff end_time start_time, result)
×
68

69
let worth_getting_root ({ context = (module Context); _ } as t) candidate =
70
  let module Consensus_context = struct
×
71
    include Context
72

73
    let logger =
74
      Logger.extend logger
×
75
        [ ( "selection_context"
76
          , `String "Bootstrap_controller.worth_getting_root" )
77
        ]
78
  end in
79
  Consensus.Hooks.equal_select_status `Take
80
  @@ Consensus.Hooks.select
81
       ~context:(module Consensus_context)
82
       ~existing:
83
         ( t.best_seen_transition |> Mina_block.Validation.block_with_hash
×
84
         |> With_hash.map ~f:Mina_block.consensus_state )
×
85
       ~candidate
86

87
let received_bad_proof ({ context = (module Context); _ } as t) host e =
88
  let open Context in
×
89
  Trust_system.(
90
    record t.trust_system logger host
91
      Actions.
92
        ( Violated_protocol
93
        , Some
94
            ( "Bad ancestor proof: $error"
95
            , [ ("error", Error_json.error_to_yojson e) ] ) ))
×
96

97
let done_syncing_root root_sync_ledger =
NEW
98
  Option.is_some (Sync_ledger.Any_ledger.peek_valid_tree root_sync_ledger)
×
99

100
let should_sync ~root_sync_ledger t candidate_state =
101
  (not @@ done_syncing_root root_sync_ledger)
×
102
  && worth_getting_root t candidate_state
×
103

104
(** Update [Synced_ledger]'s target and [best_seen_transition] and [current_root] accordingly. *)
105
let start_sync_job_with_peer ~sender ~root_sync_ledger
106
    ({ context = (module Context); _ } as t) peer_best_tip peer_root =
107
  let open Context in
×
108
  let%bind () =
109
    Trust_system.(
110
      record t.trust_system logger sender
×
111
        Actions.
112
          ( Fulfilled_request
113
          , Some ("Received verified peer root and best tip", []) ))
114
  in
115
  t.best_seen_transition <- peer_best_tip ;
×
116
  t.current_root <- peer_root ;
117
  let blockchain_state =
118
    t.current_root |> Mina_block.Validation.block |> Mina_block.header
×
119
    |> Mina_block.Header.protocol_state |> Protocol_state.blockchain_state
×
120
  in
121
  let expected_staged_ledger_hash =
×
122
    blockchain_state |> Blockchain_state.staged_ledger_hash
123
  in
124
  let snarked_ledger_hash =
×
125
    blockchain_state |> Blockchain_state.snarked_ledger_hash
126
  in
127
  return
×
128
  @@
129
  match
130
    Sync_ledger.Any_ledger.new_goal root_sync_ledger
131
      (Frozen_ledger_hash.to_ledger_hash snarked_ledger_hash)
×
132
      ~data:
133
        ( State_hash.With_state_hashes.state_hash
×
134
          @@ Mina_block.Validation.block_with_hash t.current_root
×
135
        , sender
136
        , expected_staged_ledger_hash )
137
      ~equal:(fun (hash1, _, _) (hash2, _, _) -> State_hash.equal hash1 hash2)
×
138
  with
139
  | `New ->
×
140
      t.num_of_root_snarked_ledger_retargeted <-
141
        t.num_of_root_snarked_ledger_retargeted + 1 ;
142
      `Syncing_new_snarked_ledger
143
  | `Update_data ->
×
144
      `Updating_root_transition
145
  | `Repeat ->
×
146
      `Ignored
147

148
let to_consensus_state h =
149
  Mina_block.Header.protocol_state h |> Protocol_state.consensus_state
×
150

151
(** For each transition, this function would compare it with the existing one.
152
    If the incoming transition is better, then download the merkle list from
153
    that transition to its root and verify it. If we get a better root than
154
    the existing one, then reset the Sync_ledger's target by calling
155
    [start_sync_job_with_peer] function. *)
156
let on_transition ({ context = (module Context); _ } as t) ~sender
157
    ~root_sync_ledger candidate_header =
158
  let open Context in
×
159
  let candidate_consensus_state =
160
    With_hash.map ~f:to_consensus_state candidate_header
161
  in
162
  if not @@ should_sync ~root_sync_ledger t candidate_consensus_state then
×
163
    Deferred.return `Ignored
×
164
  else
165
    match%bind
166
      Mina_networking.get_ancestry t.network sender.Peer.peer_id
×
167
        (With_hash.map_hash candidate_consensus_state
×
168
           ~f:State_hash.State_hashes.state_hash )
169
    with
170
    | Error e ->
×
171
        [%log error]
×
172
          ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
173
          !"Could not get the proof of the root transition from the network: \
174
            $error" ;
175
        Deferred.return `Ignored
×
176
    | Ok peer_root_with_proof -> (
×
177
        let pcd =
178
          peer_root_with_proof.data
179
          |> Proof_carrying_data.map
×
180
               ~f:(Mina_block.write_all_proofs_to_disk ~proof_cache_db)
181
          |> Proof_carrying_data.map_proof
182
               ~f:
183
                 (Tuple2.map_snd
184
                    ~f:(Mina_block.write_all_proofs_to_disk ~proof_cache_db) )
185
        in
186
        match%bind
187
          Mina_block.verify_on_header
×
188
            ~verify:
189
              (Sync_handler.Root.verify
×
190
                 ~context:(module Context)
191
                 ~verifier:t.verifier candidate_consensus_state )
192
            pcd
193
        with
194
        | Ok (`Root root, `Best_tip best_tip) ->
×
195
            if done_syncing_root root_sync_ledger then return `Ignored
×
196
            else
197
              start_sync_job_with_peer ~sender ~root_sync_ledger t best_tip root
×
198
        | Error e ->
×
199
            return (received_bad_proof t sender e |> Fn.const `Ignored) )
×
200

201
(** A helper function that wraps the calls to Sync_ledger and iterate through
202
    incoming transitions, add those to the transition_cache and calls
203
    [on_transition] function. *)
204
let sync_ledger ({ context = (module Context); _ } as t) ~preferred
205
    ~root_sync_ledger ~transition_graph ~sync_ledger_reader =
206
  let open Context in
×
207
  let query_reader = Sync_ledger.Any_ledger.query_reader root_sync_ledger in
NEW
208
  let response_writer = Sync_ledger.Any_ledger.answer_writer root_sync_ledger in
×
UNCOV
209
  Mina_networking.glue_sync_ledger ~preferred t.network query_reader
×
210
    response_writer ;
211
  Pipe_lib.Choosable_synchronous_pipe.iter sync_ledger_reader
×
212
    ~f:(fun (b_or_h, `Valid_cb vc) ->
213
      let header_with_hash, sender, transition_cache_element =
×
214
        match b_or_h with
215
        | `Block b_env ->
×
216
            ( Envelope.Incoming.data b_env
×
217
              |> Mina_block.Validation.block_with_hash
×
218
              |> With_hash.map ~f:Mina_block.header
×
219
            , Envelope.Incoming.remote_sender_exn b_env
×
220
            , Envelope.Incoming.map ~f:(fun x -> `Block x) b_env )
×
221
        | `Header h_env ->
×
222
            ( Envelope.Incoming.data h_env
×
223
              |> Mina_block.Validation.header_with_hash
×
224
            , Envelope.Incoming.remote_sender_exn h_env
×
225
            , Envelope.Incoming.map ~f:(fun x -> `Header x) h_env )
×
226
      in
227
      let previous_state_hash =
228
        With_hash.data header_with_hash
×
229
        |> Mina_block.Header.protocol_state
×
230
        |> Protocol_state.previous_state_hash
231
      in
232
      Transition_cache.add transition_graph ~parent:previous_state_hash
×
233
        (transition_cache_element, vc) ;
234
      (* TODO: Efficiently limiting the number of green threads in #1337 *)
235
      if
×
236
        worth_getting_root t
237
          (With_hash.map ~f:to_consensus_state header_with_hash)
×
238
      then (
×
239
        [%log trace] "Added the transition from sync_ledger_reader into cache"
×
240
          ~metadata:
241
            [ ( "state_hash"
242
              , State_hash.to_yojson
×
243
                  (State_hash.With_state_hashes.state_hash header_with_hash) )
×
244
            ; ( "header"
245
              , Mina_block.Header.to_yojson (With_hash.data header_with_hash) )
×
246
            ] ;
247

248
        Deferred.ignore_m
×
249
        @@ on_transition t ~sender ~root_sync_ledger header_with_hash )
×
250
      else Deferred.unit )
×
251

252
let external_transition_compare ~context:(module Context : CONTEXT) =
253
  let get_consensus_state =
×
254
    Fn.compose Protocol_state.consensus_state Mina_block.Header.protocol_state
255
  in
256
  Comparable.lift
×
257
    (fun existing candidate ->
258
      (* To prevent the logger to spam a lot of messages, the logger input is set to null *)
259
      if
×
260
        State_hash.equal
261
          (State_hash.With_state_hashes.state_hash existing)
×
262
          (State_hash.With_state_hashes.state_hash candidate)
×
263
      then 0
×
264
      else if
×
265
        Consensus.Hooks.equal_select_status `Keep
266
        @@ Consensus.Hooks.select ~context:(module Context) ~existing ~candidate
267
      then -1
×
268
      else 1 )
×
269
    ~f:(With_hash.map ~f:get_consensus_state)
270

271
let download_snarked_ledger ~trust_system ~preferred_peers ~transition_graph
272
    ~sync_ledger_reader ~context t temp_snarked_ledger =
273
  time_deferred
×
274
    (let root_sync_ledger =
275
       Sync_ledger.Any_ledger.create
NEW
276
         (Ledger.Root.as_unmasked temp_snarked_ledger)
×
277
         ~context ~trust_system
278
     in
279
     don't_wait_for
×
280
       (sync_ledger t ~preferred:preferred_peers ~root_sync_ledger
×
281
          ~transition_graph ~sync_ledger_reader ) ;
282
     (* We ignore the resulting ledger returned here since it will always
283
        * be the same as the ledger we started with because we are syncing
284
        * a db ledger. *)
NEW
285
     let%map _, data = Sync_ledger.Any_ledger.valid_tree root_sync_ledger in
×
NEW
286
     Sync_ledger.Any_ledger.destroy root_sync_ledger ;
×
UNCOV
287
     data )
×
288

289
(** Run one bootstrap cycle *)
290
let run_cycle ~context:(module Context : CONTEXT) ~trust_system ~verifier
291
    ~network ~consensus_local_state ~network_transition_pipe ~preferred_peers
292
    ~persistent_root ~persistent_frontier ~initial_root_transition ~catchup_mode
293
    previous_cycles =
294
  let open Context in
×
295
  (* The short-lived pipe allocated here will be closed
296
     when a follow-up pipe is allocated: in the next cycle of bootstrap
297
     or when controll is passed to the transition frontier controller.staged_ledger_construction_time
298
     Because [Choosable_synchronous_pipe.t] is used, no data will be lost: any
299
     successful read is followed by mom-blocking handling, and if the read/write
300
     pair is not consumed, it will continue in a follow-up pipe allocated in the
301
     next call to [Swappable.swap_reader].
302
  *)
303
  let%bind sync_ledger_reader = Swappable.swap_reader network_transition_pipe in
×
304
  let initial_root_transition =
×
305
    initial_root_transition |> Mina_block.Validated.remember
×
306
    |> Mina_block.Validation.reset_frontier_dependencies_validation
×
307
    |> Mina_block.Validation.reset_staged_ledger_diff_validation
308
  in
309
  let t =
×
310
    { network
311
    ; context = (module Context)
312
    ; trust_system
313
    ; verifier
314
    ; best_seen_transition = initial_root_transition
315
    ; current_root = initial_root_transition
316
    ; num_of_root_snarked_ledger_retargeted = 0
317
    }
318
  in
319
  let transition_graph = Transition_cache.create () in
320
  let temp_persistent_root_instance =
×
321
    Transition_frontier.Persistent_root.create_instance_exn persistent_root
322
  in
323
  let temp_snarked_ledger =
×
324
    Transition_frontier.Persistent_root.Instance.snarked_ledger
325
      temp_persistent_root_instance
326
  in
327
  (* step 1. download snarked_ledger *)
328
  let%bind sync_ledger_time, (hash, sender, expected_staged_ledger_hash) =
329
    download_snarked_ledger
×
330
      ~context:(module Context)
331
      ~trust_system ~preferred_peers ~transition_graph ~sync_ledger_reader t
332
      temp_snarked_ledger
333
  in
334
  Mina_metrics.(
×
335
    Counter.inc Bootstrap.root_snarked_ledger_sync_ms
×
336
      Time.Span.(to_ms sync_ledger_time)) ;
×
337
  Mina_metrics.(
338
    Gauge.set Bootstrap.num_of_root_snarked_ledger_retargeted
×
339
      (Float.of_int t.num_of_root_snarked_ledger_retargeted)) ;
×
340
  (* step 2. Download scan state and pending coinbases. *)
341
  let%bind ( staged_ledger_data_download_time
342
           , staged_ledger_construction_time
343
           , staged_ledger_aux_result ) =
344
    let%bind ( staged_ledger_data_download_time
345
             , staged_ledger_data_download_result ) =
346
      time_deferred
×
347
        (Mina_networking.get_staged_ledger_aux_and_pending_coinbases_at_hash
×
348
           t.network sender.peer_id hash )
349
    in
350
    match staged_ledger_data_download_result with
×
351
    | Error err ->
×
352
        Deferred.return (staged_ledger_data_download_time, None, Error err)
353
    | Ok
×
354
        ( scan_state_uncached
355
        , expected_merkle_root
356
        , pending_coinbases
357
        , protocol_states ) -> (
358
        let%map staged_ledger_construction_result =
359
          O1trace.thread "construct_root_staged_ledger" (fun () ->
×
360
              let open Deferred.Or_error.Let_syntax in
×
361
              let received_staged_ledger_hash =
362
                Staged_ledger_hash.of_aux_ledger_and_coinbase_hash
363
                  (Staged_ledger.Scan_state.Stable.Latest.hash
×
364
                     scan_state_uncached )
365
                  expected_merkle_root pending_coinbases
366
              in
367
              [%log debug]
×
368
                ~metadata:
369
                  [ ( "expected_staged_ledger_hash"
370
                    , Staged_ledger_hash.to_yojson expected_staged_ledger_hash
×
371
                    )
372
                  ; ( "received_staged_ledger_hash"
373
                    , Staged_ledger_hash.to_yojson received_staged_ledger_hash
×
374
                    )
375
                  ]
376
                "Comparing $expected_staged_ledger_hash to \
377
                 $received_staged_ledger_hash" ;
378
              let%bind new_root =
379
                t.current_root
380
                |> Mina_block.Validation.skip_frontier_dependencies_validation
×
381
                     `This_block_belongs_to_a_detached_subtree
382
                |> Mina_block.Validation.validate_staged_ledger_hash
×
383
                     (`Staged_ledger_already_materialized
384
                       received_staged_ledger_hash )
385
                |> Result.map_error ~f:(fun _ ->
×
386
                       Error.of_string "received faulty scan state from peer" )
×
387
                |> Deferred.return
×
388
              in
389
              let protocol_states =
×
390
                List.map protocol_states
391
                  ~f:(With_hash.of_data ~hash_data:Protocol_state.hashes)
392
              in
393
              let scan_state =
×
394
                Staged_ledger.Scan_state.write_all_proofs_to_disk
395
                  ~proof_cache_db scan_state_uncached
396
              in
397
              let%bind protocol_states =
398
                Staged_ledger.Scan_state.check_required_protocol_states
×
399
                  scan_state ~protocol_states
400
                |> Deferred.return
×
401
              in
402
              let protocol_states_map =
×
403
                protocol_states
404
                |> List.map ~f:(fun ps ->
×
405
                       (State_hash.With_state_hashes.state_hash ps, ps) )
×
406
                |> State_hash.Map.of_alist_exn
407
              in
408
              let get_state hash =
×
409
                match Map.find protocol_states_map hash with
×
410
                | None ->
×
411
                    let new_state_hash =
412
                      State_hash.With_state_hashes.state_hash (fst new_root)
×
413
                    in
414
                    [%log error]
×
415
                      ~metadata:
416
                        [ ("new_root", State_hash.to_yojson new_state_hash)
×
417
                        ; ("state_hash", State_hash.to_yojson hash)
×
418
                        ]
419
                      "Protocol state (for scan state transactions) for \
420
                       $state_hash not found when bootstrapping to the new \
421
                       root $new_root" ;
422
                    Or_error.errorf
×
423
                      !"Protocol state (for scan state transactions) for \
×
424
                        %{sexp:State_hash.t} not found when bootstrapping to \
425
                        the new root %{sexp:State_hash.t}"
426
                      hash new_state_hash
427
                | Some protocol_state ->
×
428
                    Ok (With_hash.data protocol_state)
×
429
              in
430
              (* step 3. Construct staged ledger from snarked ledger, scan state
431
                 and pending coinbases. *)
432
              (* Construct the staged ledger before constructing the transition
433
               * frontier in order to verify the scan state we received.
434
               * TODO: reorganize the code to avoid doing this twice (#3480) *)
435
              let open Deferred.Let_syntax in
436
              let%map staged_ledger_construction_time, construction_result =
437
                time_deferred
×
438
                  (let open Deferred.Let_syntax in
439
                  let temp_mask = Ledger.Root.as_masked temp_snarked_ledger in
440
                  let%map result =
441
                    Staged_ledger
442
                    .of_scan_state_pending_coinbases_and_snarked_ledger ~logger
443
                      ~snarked_local_state:
444
                        Mina_block.(
445
                          t.current_root |> Validation.block |> header
×
446
                          |> Header.protocol_state
×
447
                          |> Protocol_state.blockchain_state
×
448
                          |> Blockchain_state.snarked_local_state)
×
449
                      ~verifier ~constraint_constants ~scan_state
450
                      ~snarked_ledger:temp_mask ~expected_merkle_root
451
                      ~pending_coinbases ~get_state
452
                  in
453
                  ignore
×
454
                    ( Ledger.Maskable.unregister_mask_exn ~loc:__LOC__ temp_mask
×
455
                      : Ledger.unattached_mask ) ;
456
                  Result.map result
457
                    ~f:
458
                      (const
×
459
                         ( scan_state
460
                         , pending_coinbases
461
                         , new_root
462
                         , protocol_states ) ))
463
              in
464
              Ok (staged_ledger_construction_time, construction_result) )
×
465
        in
466
        match staged_ledger_construction_result with
×
467
        | Error err ->
×
468
            (staged_ledger_data_download_time, None, Error err)
469
        | Ok (staged_ledger_construction_time, result) ->
×
470
            ( staged_ledger_data_download_time
471
            , Some staged_ledger_construction_time
472
            , result ) )
473
  in
474
  Transition_frontier.Persistent_root.Instance.close
×
475
    temp_persistent_root_instance ;
476
  match staged_ledger_aux_result with
×
477
  | Error e ->
×
478
      let%map () =
479
        Trust_system.(
480
          record t.trust_system logger sender
×
481
            Actions.
482
              ( Outgoing_connection_error
483
              , Some
484
                  ( "Can't find scan state from the peer or received faulty \
485
                     scan state from the peer."
486
                  , [] ) ))
487
      in
488
      [%log error]
×
489
        ~metadata:
490
          [ ("error", Error_json.error_to_yojson e)
×
491
          ; ("state_hash", State_hash.to_yojson hash)
×
492
          ; ( "expected_staged_ledger_hash"
493
            , Staged_ledger_hash.to_yojson expected_staged_ledger_hash )
×
494
          ]
495
        "Failed to find scan state for the transition with hash $state_hash \
496
         from the peer or received faulty scan state: $error. Retry bootstrap" ;
497
      let this_cycle =
×
498
        { cycle_result = "failed to download and construct scan state"
499
        ; sync_ledger_time
500
        ; staged_ledger_data_download_time
501
        ; staged_ledger_construction_time
502
        ; local_state_sync_required = false
503
        ; local_state_sync_time = None
504
        }
505
      in
506
      `Repeat (this_cycle :: previous_cycles)
507
  | Ok (scan_state, pending_coinbase, new_root, protocol_states) -> (
×
508
      let%bind () =
509
        Trust_system.(
510
          record t.trust_system logger sender
×
511
            Actions.
512
              ( Fulfilled_request
513
              , Some ("Received valid scan state from peer", []) ))
514
      in
515
      let best_seen_block_with_hash, _ = t.best_seen_transition in
×
516
      let consensus_state =
517
        With_hash.data best_seen_block_with_hash
×
518
        |> Mina_block.header |> Mina_block.Header.protocol_state
×
519
        |> Protocol_state.consensus_state
520
      in
521
      (* step 4. Synchronize consensus local state if necessary *)
522
      let%bind ( local_state_sync_time
523
               , (local_state_sync_required, local_state_sync_result) ) =
524
        time_deferred
×
525
          ( match
526
              Consensus.Hooks.required_local_state_sync
527
                ~constants:precomputed_values.consensus_constants
528
                ~consensus_state ~local_state:consensus_local_state
529
            with
530
          | None ->
×
531
              [%log debug]
×
532
                ~metadata:
533
                  [ ( "local_state"
534
                    , Consensus.Data.Local_state.to_yojson consensus_local_state
×
535
                    )
536
                  ; ( "consensus_state"
537
                    , Consensus.Data.Consensus_state.Value.to_yojson
×
538
                        consensus_state )
539
                  ]
540
                "Not synchronizing consensus local state" ;
541
              Deferred.return (false, Or_error.return ())
×
542
          | Some sync_jobs ->
×
543
              [%log info] "Synchronizing consensus local state" ;
×
544
              let%map result =
545
                Consensus.Hooks.sync_local_state
×
546
                  ~context:(module Context)
547
                  ~local_state:consensus_local_state ~trust_system
548
                  ~glue_sync_ledger:(Mina_networking.glue_sync_ledger t.network)
×
549
                  sync_jobs
550
              in
551
              (true, result) )
×
552
      in
553
      match local_state_sync_result with
×
554
      | Error e ->
×
555
          [%log error]
×
556
            ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
557
            "Local state sync failed: $error. Retry bootstrap" ;
558
          let this_cycle =
×
559
            { cycle_result = "failed to synchronize local state"
560
            ; sync_ledger_time
561
            ; staged_ledger_data_download_time
562
            ; staged_ledger_construction_time
563
            ; local_state_sync_required
564
            ; local_state_sync_time = Some local_state_sync_time
565
            }
566
          in
567
          Deferred.return (`Repeat (this_cycle :: previous_cycles))
568
      | Ok () ->
×
569
          (* step 5. Close the old frontier and reload a new one from disk. *)
570
          let new_root_data : Transition_frontier.Root_data.Limited.t =
571
            Transition_frontier.Root_data.Limited.create
572
              ~transition:(Mina_block.Validated.lift new_root)
×
573
              ~scan_state ~pending_coinbase ~protocol_states
574
          in
575
          let%bind () =
576
            Transition_frontier.Persistent_frontier.reset_database_exn
×
577
              persistent_frontier ~root_data:new_root_data
578
              ~genesis_state_hash:
579
                (State_hash.With_state_hashes.state_hash
×
580
                   precomputed_values.protocol_state_with_hashes )
581
          in
582
          (* TODO: lazy load db in persistent root to avoid unnecessary opens like this *)
583
          Transition_frontier.Persistent_root.(
×
584
            with_instance_exn persistent_root ~f:(fun instance ->
×
585
                Instance.set_root_state_hash instance
×
586
                @@ Mina_block.Validated.state_hash
×
587
                @@ Mina_block.Validated.lift new_root )) ;
×
588
          let%map new_frontier =
589
            let fail msg =
590
              failwith
×
591
                ( "failed to initialize transition frontier after \
592
                   bootstrapping: " ^ msg )
593
            in
594
            Transition_frontier.load
×
595
              ~context:(module Context)
596
              ~retry_with_fresh_db:false ~verifier ~consensus_local_state
597
              ~persistent_root ~persistent_frontier ~catchup_mode ()
598
            >>| function
×
599
            | Ok frontier ->
×
600
                frontier
601
            | Error (`Failure msg) ->
×
602
                fail msg
603
            | Error `Bootstrap_required ->
×
604
                fail
605
                  "bootstrap still required (indicates logical error in code)"
606
            | Error `Persistent_frontier_malformed ->
×
607
                fail "persistent frontier was malformed"
608
            | Error `Snarked_ledger_mismatch ->
×
609
                fail
610
                  "this should not happen, because we just reset the \
611
                   snarked_ledger"
612
          in
613
          [%str_log info] Bootstrap_complete ;
×
614
          let collected_transitions = Transition_cache.data transition_graph in
×
615
          let logger =
×
616
            Logger.extend logger
617
              [ ("context", `String "Filter collected transitions in bootstrap")
618
              ]
619
          in
620
          let root_consensus_state =
×
621
            Transition_frontier.(
622
              Breadcrumb.consensus_state_with_hashes (root new_frontier))
×
623
          in
624
          let filtered_collected_transitions =
625
            List.filter collected_transitions
626
              ~f:(fun (incoming_transition, _) ->
627
                let transition =
×
628
                  Envelope.Incoming.data incoming_transition
×
629
                  |> Transition_cache.header_with_hash
630
                in
631
                Consensus.Hooks.equal_select_status `Take
×
632
                @@ Consensus.Hooks.select
633
                     ~context:(module Context)
634
                     ~existing:root_consensus_state
635
                     ~candidate:
636
                       (With_hash.map
×
637
                          ~f:
638
                            (Fn.compose Protocol_state.consensus_state
×
639
                               Mina_block.Header.protocol_state )
640
                          transition ) )
641
          in
642
          [%log debug] "Sorting filtered transitions by consensus state"
×
643
            ~metadata:[] ;
644
          let sorted_filtered_collected_transitions =
×
645
            O1trace.sync_thread "sorting_collected_transitions" (fun () ->
646
                List.sort filtered_collected_transitions
×
647
                  ~compare:
648
                    (Comparable.lift
×
649
                       ~f:(fun (x, _) ->
650
                         Transition_cache.header_with_hash
×
651
                         @@ Envelope.Incoming.data x )
×
652
                       (external_transition_compare ~context:(module Context)) ) )
653
          in
654
          let this_cycle =
×
655
            { cycle_result = "success"
656
            ; sync_ledger_time
657
            ; staged_ledger_data_download_time
658
            ; staged_ledger_construction_time
659
            ; local_state_sync_required
660
            ; local_state_sync_time = Some local_state_sync_time
661
            }
662
          in
663
          `Finished
664
            ( this_cycle :: previous_cycles
665
            , (new_frontier, sorted_filtered_collected_transitions) ) )
666

667
(** The entry point function for bootstrap controller. When bootstrap finished
668
    it would return a transition frontier with the root breadcrumb and a list
669
    of transitions collected during bootstrap.
670

671
    Bootstrap controller would do the following steps to contrust the
672
    transition frontier:
673
    1. Download the root snarked_ledger.
674
    2. Download the scan state and pending coinbases.
675
    3. Construct the staged ledger from the snarked ledger, scan state and
676
       pending coinbases.
677
    4. Synchronize the consensus local state if necessary.
678
    5. Close the old frontier and reload a new one from disk.
679
 *)
680
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
681
    ~consensus_local_state ~network_transition_pipe ~preferred_peers
682
    ~persistent_root ~persistent_frontier ~initial_root_transition ~catchup_mode
683
    =
684
  let open Context in
×
685
  let run_cycle =
686
    run_cycle
687
      ~context:(module Context : CONTEXT)
688
      ~trust_system ~verifier ~network ~consensus_local_state
689
      ~network_transition_pipe ~preferred_peers ~persistent_root
690
      ~persistent_frontier ~initial_root_transition ~catchup_mode
691
  in
692
  O1trace.thread "bootstrap"
693
  @@ fun () ->
694
  let%map time_elapsed, (cycles, result) =
695
    time_deferred @@ Deferred.repeat_until_finished [] run_cycle
×
696
  in
697
  [%log info] "Bootstrap completed in $time_elapsed: $bootstrap_stats"
×
698
    ~metadata:
699
      [ ("time_elapsed", time_to_yojson time_elapsed)
×
700
      ; ( "bootstrap_stats"
701
        , `List (List.map ~f:bootstrap_cycle_stats_to_yojson cycles) )
×
702
      ] ;
703
  Mina_metrics.(
×
704
    Gauge.set Bootstrap.bootstrap_time_ms Core.Time.(Span.to_ms @@ time_elapsed)) ;
×
705
  result
706

707
let%test_module "Bootstrap_controller tests" =
708
  ( module struct
709
    let max_frontier_length =
710
      Transition_frontier.global_max_length Genesis_constants.For_unit_tests.t
×
711

712
    let logger = Logger.null ()
×
713

714
    let () =
715
      (* Disable log messages from best_tip_diff logger. *)
716
      Logger.Consumer_registry.register ~commit_id:""
×
717
        ~id:Logger.Logger_id.best_tip_diff ~processor:(Logger.Processor.raw ())
×
718
        ~transport:
719
          (Logger.Transport.create
×
720
             ( module struct
721
               type t = unit
722

723
               let transport () _ = ()
×
724
             end )
725
             () )
726
        ()
727

728
    let trust_system =
729
      let s = Trust_system.null () in
730
      don't_wait_for
×
731
        (Pipe_lib.Strict_pipe.Reader.iter
×
732
           (Trust_system.upcall_pipe s)
×
733
           ~f:(const Deferred.unit) ) ;
×
734
      s
×
735

736
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
737

738
    let proof_level = precomputed_values.proof_level
739

740
    let constraint_constants = precomputed_values.constraint_constants
741

742
    let ledger_sync_config =
743
      Syncable_ledger.create_config
×
744
        ~compile_config:Mina_compile_config.For_unit_tests.t
745
        ~max_subtree_depth:None ~default_subtree_depth:None ()
746

747
    module Context = struct
748
      let logger = logger
749

750
      let precomputed_values = precomputed_values
751

752
      let constraint_constants =
753
        Genesis_constants.For_unit_tests.Constraint_constants.t
754

755
      let consensus_constants = precomputed_values.consensus_constants
756

757
      let ledger_sync_config =
758
        Syncable_ledger.create_config
×
759
          ~compile_config:Mina_compile_config.For_unit_tests.t
760
          ~max_subtree_depth:None ~default_subtree_depth:None ()
761

762
      let proof_cache_db = Proof_cache_tag.For_tests.create_db ()
×
763
    end
764

765
    let verifier =
766
      Async.Thread_safe.block_on_async_exn (fun () ->
×
767
          Verifier.For_tests.default ~constraint_constants ~logger ~proof_level
×
768
            () )
769

770
    module Genesis_ledger = (val precomputed_values.genesis_ledger)
771

772
    let downcast_transition ~sender transition =
773
      let transition =
×
774
        transition |> Mina_block.Validated.remember
×
775
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
776
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
777
      in
778
      Envelope.Incoming.wrap ~data:transition
×
779
        ~sender:(Envelope.Sender.Remote sender)
780

781
    let downcast_breadcrumb ~sender breadcrumb =
782
      downcast_transition ~sender
×
783
        (Transition_frontier.Breadcrumb.validated_transition breadcrumb)
×
784

785
    let make_non_running_bootstrap ~genesis_root ~network =
786
      let transition =
×
787
        genesis_root
788
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
789
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
790
      in
791
      { context = (module Context)
×
792
      ; trust_system
793
      ; verifier
794
      ; best_seen_transition = transition
795
      ; current_root = transition
796
      ; network
797
      ; num_of_root_snarked_ledger_retargeted = 0
798
      }
799

800
    let%test_unit "Bootstrap controller caches all transitions it is passed \
801
                   through the transition_reader" =
802
      let branch_size = (max_frontier_length * 2) + 2 in
×
803
      Quickcheck.test ~trials:1
804
        (let open Quickcheck.Generator.Let_syntax in
805
        (* we only need one node for this test, but we need more than one peer so that mina_networking does not throw an error *)
806
        let%bind fake_network =
807
          Fake_network.Generator.(
808
            gen ~precomputed_values ~verifier ~max_frontier_length
×
809
              ~ledger_sync_config [ fresh_peer; fresh_peer ])
810
        in
811
        let%map make_branch =
812
          Transition_frontier.Breadcrumb.For_tests.gen_seq ~precomputed_values
×
813
            ~verifier
814
            ~accounts_with_secret_keys:(Lazy.force Genesis_ledger.accounts)
×
815
            branch_size
816
        in
817
        let [ me; _ ] = fake_network.peer_networks in
×
818
        let branch =
819
          Async.Thread_safe.block_on_async_exn (fun () ->
820
              make_branch (Transition_frontier.root me.state.frontier) )
×
821
        in
822
        (fake_network, branch))
×
823
        ~f:(fun (fake_network, branch) ->
824
          let [ me; other ] = fake_network.peer_networks in
×
825
          let genesis_root =
826
            Transition_frontier.(
827
              Breadcrumb.validated_transition @@ root me.state.frontier)
×
828
            |> Mina_block.Validated.remember
829
          in
830
          let transition_graph = Transition_cache.create () in
×
831
          let sync_ledger_reader, sync_ledger_writer =
×
832
            Pipe_lib.Choosable_synchronous_pipe.create ()
833
          in
834
          let bootstrap =
×
835
            make_non_running_bootstrap ~genesis_root ~network:me.network
836
          in
837
          let root_sync_ledger =
838
            Sync_ledger.Any_ledger.create
NEW
839
              ( Ledger.Root.as_unmasked
×
NEW
840
              @@ Transition_frontier.root_snarked_ledger me.state.frontier )
×
841
              ~context:(module Context)
842
              ~trust_system
843
          in
844
          Async.Thread_safe.block_on_async_exn (fun () ->
×
845
              let sync_deferred =
×
846
                sync_ledger bootstrap ~root_sync_ledger ~transition_graph
847
                  ~preferred:[] ~sync_ledger_reader
848
              in
849
              let%bind sync_ledger_writer' =
850
                Deferred.List.fold ~init:sync_ledger_writer branch
×
851
                  ~f:(fun sync_ledger_writer breadcrumb ->
852
                    Pipe_lib.Choosable_synchronous_pipe.write sync_ledger_writer
×
853
                      ( `Block
854
                          (downcast_breadcrumb ~sender:other.peer breadcrumb)
×
855
                      , `Valid_cb None ) )
856
              in
857
              Pipe_lib.Choosable_synchronous_pipe.close sync_ledger_writer' ;
×
858
              sync_deferred ) ;
×
859
          let expected_transitions =
×
860
            List.map branch
861
              ~f:
862
                (Fn.compose
×
863
                   (With_hash.map ~f:Mina_block.header)
864
                   (Fn.compose Mina_block.Validation.block_with_hash
×
865
                      (Fn.compose Mina_block.Validated.remember
×
866
                         Transition_frontier.Breadcrumb.validated_transition ) ) )
867
          in
868
          let saved_transitions =
×
869
            Transition_cache.data transition_graph
×
870
            |> List.map ~f:(fun (x, _) ->
871
                   Transition_cache.header_with_hash @@ Envelope.Incoming.data x )
×
872
          in
873
          let module E = struct
×
874
            module T = struct
875
              type t = Mina_block.Header.t State_hash.With_state_hashes.t
×
876
              [@@deriving sexp]
877

878
              let compare = external_transition_compare ~context:(module Context)
879
            end
880

881
            include Comparable.Make (T)
882
          end in
883
          [%test_result: E.Set.t]
×
884
            (E.Set.of_list saved_transitions)
×
885
            ~expect:(E.Set.of_list expected_transitions) )
×
886

887
    let run_bootstrap ~timeout_duration ~my_net ~network_transition_pipe =
888
      let open Fake_network in
×
889
      let time_controller = Block_time.Controller.basic ~logger in
890
      let persistent_root =
891
        Transition_frontier.persistent_root my_net.state.frontier
892
      in
893
      let persistent_frontier =
×
894
        Transition_frontier.persistent_frontier my_net.state.frontier
895
      in
896
      let initial_root_transition =
×
897
        Transition_frontier.(
898
          Breadcrumb.validated_transition (root my_net.state.frontier))
×
899
      in
900
      let%bind () =
901
        Transition_frontier.close ~loc:__LOC__ my_net.state.frontier
×
902
      in
903
      [%log info] "bootstrap begin" ;
×
904
      Block_time.Timeout.await_exn time_controller ~timeout_duration
×
905
        (run
906
           ~context:(module Context)
907
           ~trust_system ~verifier ~network:my_net.network ~preferred_peers:[]
908
           ~consensus_local_state:my_net.state.consensus_local_state
909
           ~network_transition_pipe ~persistent_root ~persistent_frontier
910
           ~catchup_mode:`Super ~initial_root_transition )
911

912
    let assert_transitions_increasingly_sorted ~root
913
        (incoming_transitions : Transition_cache.element list) =
914
      let root =
×
915
        Transition_frontier.Breadcrumb.block root |> Mina_block.header
×
916
      in
917
      ignore
×
918
        ( List.fold_result ~init:root incoming_transitions
×
919
            ~f:(fun max_acc incoming_transition ->
920
              let With_hash.{ data = header; _ } =
×
921
                Transition_cache.header_with_hash
922
                  (Envelope.Incoming.data @@ fst incoming_transition)
×
923
              in
924
              let header_len h =
×
925
                Mina_block.Header.protocol_state h
×
926
                |> Protocol_state.consensus_state
×
927
                |> Consensus.Data.Consensus_state.blockchain_length
928
              in
929
              let open Result.Let_syntax in
930
              let%map () =
931
                Result.ok_if_true
×
932
                  Mina_numbers.Length.(header_len max_acc <= header_len header)
×
933
                  ~error:
934
                    (Error.of_string
×
935
                       "The blocks are not sorted in increasing order" )
936
              in
937
              header )
×
938
          |> Or_error.ok_exn
×
939
          : Mina_block.Header.t )
940

941
    let%test_unit "sync with one node after receiving a transition" =
942
      Quickcheck.test ~trials:1
×
943
        Fake_network.Generator.(
944
          gen ~precomputed_values ~verifier ~max_frontier_length
×
945
            ~ledger_sync_config
946
            [ fresh_peer
947
            ; peer_with_branch
948
                ~frontier_branch_size:((max_frontier_length * 2) + 2)
949
            ])
950
        ~f:(fun fake_network ->
951
          let [ my_net; peer_net ] = fake_network.peer_networks in
×
952
          let block =
953
            Envelope.Incoming.wrap
954
              ~data:
955
                ( Transition_frontier.best_tip peer_net.state.frontier
×
956
                |> Transition_frontier.Breadcrumb.validated_transition
×
957
                |> Mina_block.Validated.remember
×
958
                |> Mina_block.Validation.reset_frontier_dependencies_validation
×
959
                |> Mina_block.Validation.reset_staged_ledger_diff_validation )
×
960
              ~sender:(Envelope.Sender.Remote peer_net.peer)
961
          in
962
          let network_transition_pipe =
963
            Swappable.create ~name:(__MODULE__ ^ __LOC__)
964
              (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
965
          in
966
          Swappable.write network_transition_pipe (`Block block, `Valid_cb None) ;
×
967
          let new_frontier, sorted_external_transitions =
×
968
            Async.Thread_safe.block_on_async_exn (fun () ->
969
                run_bootstrap
×
970
                  ~timeout_duration:(Block_time.Span.of_ms 30_000L)
×
971
                  ~my_net ~network_transition_pipe )
972
          in
973
          assert_transitions_increasingly_sorted
×
974
            ~root:(Transition_frontier.root new_frontier)
×
975
            sorted_external_transitions ;
976
          [%test_result: Ledger_hash.t]
×
NEW
977
            ( Ledger.Root.merkle_root
×
978
            @@ Transition_frontier.root_snarked_ledger new_frontier )
×
979
            ~expect:
NEW
980
              ( Ledger.Root.merkle_root
×
981
              @@ Transition_frontier.root_snarked_ledger peer_net.state.frontier
×
982
              ) )
983

984
    let%test_unit "reconstruct staged_ledgers using \
985
                   of_scan_state_and_snarked_ledger" =
986
      Quickcheck.test ~trials:1
×
987
        (Transition_frontier.For_tests.gen ~precomputed_values ~verifier
×
988
           ~max_length:max_frontier_length ~size:max_frontier_length () )
989
        ~f:(fun frontier ->
990
          Thread_safe.block_on_async_exn
×
991
          @@ fun () ->
992
          Deferred.List.iter (Transition_frontier.all_breadcrumbs frontier)
×
993
            ~f:(fun breadcrumb ->
994
              let staged_ledger =
×
995
                Transition_frontier.Breadcrumb.staged_ledger breadcrumb
996
              in
997
              let expected_merkle_root =
×
998
                Staged_ledger.ledger staged_ledger |> Ledger.merkle_root
×
999
              in
1000
              let snarked_ledger =
×
1001
                Transition_frontier.root_snarked_ledger frontier
×
1002
                |> Ledger.Root.as_masked
1003
              in
1004
              let snarked_local_state =
×
1005
                Transition_frontier.root frontier
×
1006
                |> Transition_frontier.Breadcrumb.protocol_state
×
1007
                |> Protocol_state.blockchain_state
×
1008
                |> Blockchain_state.snarked_local_state
1009
              in
1010
              let scan_state = Staged_ledger.scan_state staged_ledger in
×
1011
              let get_state hash =
×
1012
                match Transition_frontier.find_protocol_state frontier hash with
×
1013
                | Some protocol_state ->
×
1014
                    Ok protocol_state
1015
                | None ->
×
1016
                    Or_error.errorf
1017
                      !"Protocol state (for scan state transactions) for \
×
1018
                        %{sexp:State_hash.t} not found"
1019
                      hash
1020
              in
1021
              let pending_coinbases =
1022
                Staged_ledger.pending_coinbase_collection staged_ledger
1023
              in
1024
              let%map actual_staged_ledger =
1025
                Staged_ledger.of_scan_state_pending_coinbases_and_snarked_ledger
1026
                  ~scan_state ~logger ~verifier ~constraint_constants
1027
                  ~snarked_ledger ~snarked_local_state ~expected_merkle_root
1028
                  ~pending_coinbases ~get_state
1029
                |> Deferred.Or_error.ok_exn
×
1030
              in
1031
              let height =
×
1032
                Transition_frontier.Breadcrumb.consensus_state breadcrumb
×
1033
                |> Consensus.Data.Consensus_state.blockchain_length
×
1034
                |> Mina_numbers.Length.to_int
1035
              in
1036
              [%test_eq: Staged_ledger_hash.t]
×
1037
                ~message:
1038
                  (sprintf "mismatch of staged ledger hash height %d" height)
×
1039
                (Transition_frontier.Breadcrumb.staged_ledger_hash breadcrumb)
×
1040
                (Staged_ledger.hash actual_staged_ledger) ) )
×
1041

1042
    (*
1043
    let%test_unit "if we see a new transition that is better than the \
1044
                   transition that we are syncing from, than we should \
1045
                   retarget our root" =
1046
      Quickcheck.test ~trials:1
1047
        Fake_network.Generator.(
1048
          gen ~max_frontier_length
1049
            [ fresh_peer
1050
            ; peer_with_branch ~frontier_branch_size:max_frontier_length
1051
            ; peer_with_branch
1052
                ~frontier_branch_size:((max_frontier_length * 2) + 2) ])
1053
        ~f:(fun fake_network ->
1054
          let [me; weaker_chain; stronger_chain] =
1055
            fake_network.peer_networks
1056
          in
1057
          let transition_reader, transition_writer =
1058
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
1059
              (Buffered (`Capacity 10, `Overflow Drop_head))
1060
          in
1061
          Envelope.Incoming.wrap
1062
            ~data:
1063
              ( Transition_frontier.best_tip weaker_chain.state.frontier
1064
              |> Transition_frontier.Breadcrumb.validated_transition
1065
              |> Mina_block.Validated.to_initial_validated )
1066
            ~sender:
1067
              (Envelope.Sender.Remote
1068
                 (weaker_chain.peer.host, weaker_chain.peer.peer_id))
1069
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1070
          Envelope.Incoming.wrap
1071
            ~data:
1072
              ( Transition_frontier.best_tip stronger_chain.state.frontier
1073
              |> Transition_frontier.Breadcrumb.validated_transition
1074
              |> Mina_block.Validated.to_initial_validated )
1075
            ~sender:
1076
              (Envelope.Sender.Remote
1077
                 (stronger_chain.peer.host, stronger_chain.peer.peer_id))
1078
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1079
          let new_frontier, sorted_external_transitions =
1080
            Async.Thread_safe.block_on_async_exn (fun () ->
1081
                run_bootstrap
1082
                  ~timeout_duration:(Block_time.Span.of_ms 60_000L)
1083
                  ~my_net:me ~transition_reader )
1084
          in
1085
          assert_transitions_increasingly_sorted
1086
            ~root:(Transition_frontier.root new_frontier)
1087
            sorted_external_transitions ;
1088
          [%test_result: Ledger_hash.t]
1089
            ( Ledger.Db.merkle_root
1090
            @@ Transition_frontier.root_snarked_ledger new_frontier )
1091
            ~expect:
1092
              ( Ledger.Db.merkle_root
1093
              @@ Transition_frontier.root_snarked_ledger
1094
                   stronger_chain.state.frontier ) )
1095
*)
1096
  end )
88✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc