• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 3001

01 Dec 2024 06:25PM UTC coverage: 36.257% (-24.4%) from 60.697%
3001

push

buildkite

web-flow
Merge pull request #16393 from leopardracer/patch-1

fix: typos in documentation files

25609 of 70631 relevant lines covered (36.26%)

27168.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.82
/src/lib/bootstrap_controller/bootstrap_controller.ml
1
(* Only show stdout for failed inline tests. *)
5✔
2
open Core
3
open Async
4
open Mina_base
5
module Ledger = Mina_ledger.Ledger
6
module Sync_ledger = Mina_ledger.Sync_ledger
7
open Mina_state
8
open Pipe_lib.Strict_pipe
9
open Network_peer
10
module Transition_cache = Transition_cache
11

12
module type CONTEXT = sig
13
  val logger : Logger.t
14

15
  val precomputed_values : Precomputed_values.t
16

17
  val constraint_constants : Genesis_constants.Constraint_constants.t
18

19
  val consensus_constants : Consensus.Constants.t
20
end
21

22
type Structured_log_events.t += Bootstrap_complete
23
  [@@deriving register_event { msg = "Bootstrap state: complete." }]
3✔
24

25
type t =
26
  { context : (module CONTEXT)
27
  ; trust_system : Trust_system.t
28
  ; verifier : Verifier.t
29
  ; mutable best_seen_transition : Mina_block.initial_valid_block
30
  ; mutable current_root : Mina_block.initial_valid_block
31
  ; network : Mina_networking.t
32
  ; mutable num_of_root_snarked_ledger_retargeted : int
33
  }
34

35
type time = Time.Span.t
36

37
let time_to_yojson span =
38
  `String (Printf.sprintf "%f seconds" (Time.Span.to_sec span))
×
39

40
type opt_time = time option
41

42
let opt_time_to_yojson = function
43
  | Some time ->
×
44
      time_to_yojson time
45
  | None ->
×
46
      `Null
47

48
(** An auxiliary data structure for collecting various metrics for boostrap controller. *)
49
type bootstrap_cycle_stats =
×
50
  { cycle_result : string
×
51
  ; sync_ledger_time : time
×
52
  ; staged_ledger_data_download_time : time
×
53
  ; staged_ledger_construction_time : opt_time
×
54
  ; local_state_sync_required : bool
×
55
  ; local_state_sync_time : opt_time
×
56
  }
57
[@@deriving to_yojson]
58

59
let time_deferred deferred =
60
  let start_time = Time.now () in
×
61
  let%map result = deferred in
62
  let end_time = Time.now () in
×
63
  (Time.diff end_time start_time, result)
×
64

65
let worth_getting_root ({ context = (module Context); _ } as t) candidate =
66
  let module Consensus_context = struct
×
67
    include Context
68

69
    let compile_config = precomputed_values.compile_config
70

71
    let logger =
72
      Logger.extend logger
×
73
        [ ( "selection_context"
74
          , `String "Bootstrap_controller.worth_getting_root" )
75
        ]
76
  end in
77
  Consensus.Hooks.equal_select_status `Take
78
  @@ Consensus.Hooks.select
79
       ~context:(module Consensus_context)
80
       ~existing:
81
         ( t.best_seen_transition |> Mina_block.Validation.block_with_hash
×
82
         |> With_hash.map ~f:Mina_block.consensus_state )
×
83
       ~candidate
84

85
let received_bad_proof ({ context = (module Context); _ } as t) host e =
86
  let open Context in
×
87
  Trust_system.(
88
    record t.trust_system logger host
89
      Actions.
90
        ( Violated_protocol
91
        , Some
92
            ( "Bad ancestor proof: $error"
93
            , [ ("error", Error_json.error_to_yojson e) ] ) ))
×
94

95
let done_syncing_root root_sync_ledger =
96
  Option.is_some (Sync_ledger.Db.peek_valid_tree root_sync_ledger)
×
97

98
let should_sync ~root_sync_ledger t candidate_state =
99
  (not @@ done_syncing_root root_sync_ledger)
×
100
  && worth_getting_root t candidate_state
×
101

102
(** Update [Synced_ledger]'s target and [best_seen_transition] and [current_root] accordingly. *)
103
let start_sync_job_with_peer ~sender ~root_sync_ledger
104
    ({ context = (module Context); _ } as t) peer_best_tip peer_root =
105
  let open Context in
×
106
  let%bind () =
107
    Trust_system.(
108
      record t.trust_system logger sender
×
109
        Actions.
110
          ( Fulfilled_request
111
          , Some ("Received verified peer root and best tip", []) ))
112
  in
113
  t.best_seen_transition <- peer_best_tip ;
×
114
  t.current_root <- peer_root ;
115
  let blockchain_state =
116
    t.current_root |> Mina_block.Validation.block |> Mina_block.header
×
117
    |> Mina_block.Header.protocol_state |> Protocol_state.blockchain_state
×
118
  in
119
  let expected_staged_ledger_hash =
×
120
    blockchain_state |> Blockchain_state.staged_ledger_hash
121
  in
122
  let snarked_ledger_hash =
×
123
    blockchain_state |> Blockchain_state.snarked_ledger_hash
124
  in
125
  return
×
126
  @@
127
  match
128
    Sync_ledger.Db.new_goal root_sync_ledger
129
      (Frozen_ledger_hash.to_ledger_hash snarked_ledger_hash)
×
130
      ~data:
131
        ( State_hash.With_state_hashes.state_hash
×
132
          @@ Mina_block.Validation.block_with_hash t.current_root
×
133
        , sender
134
        , expected_staged_ledger_hash )
135
      ~equal:(fun (hash1, _, _) (hash2, _, _) -> State_hash.equal hash1 hash2)
×
136
  with
137
  | `New ->
×
138
      t.num_of_root_snarked_ledger_retargeted <-
139
        t.num_of_root_snarked_ledger_retargeted + 1 ;
140
      `Syncing_new_snarked_ledger
141
  | `Update_data ->
×
142
      `Updating_root_transition
143
  | `Repeat ->
×
144
      `Ignored
145

146
let to_consensus_state h =
147
  Mina_block.Header.protocol_state h |> Protocol_state.consensus_state
×
148

149
(** For each transition, this function would compare it with the existing one.
150
    If the incoming transition is better, then download the merkle list from
151
    that transition to its root and verify it. If we get a better root than
152
    the existing one, then reset the Sync_ledger's target by calling
153
    [start_sync_job_with_peer] function. *)
154
let on_transition ({ context = (module Context); _ } as t) ~sender
155
    ~root_sync_ledger candidate_header =
156
  let open Context in
×
157
  let candidate_consensus_state =
158
    With_hash.map ~f:to_consensus_state candidate_header
159
  in
160
  if not @@ should_sync ~root_sync_ledger t candidate_consensus_state then
×
161
    Deferred.return `Ignored
×
162
  else
163
    match%bind
164
      Mina_networking.get_ancestry t.network sender.Peer.peer_id
×
165
        (With_hash.map_hash candidate_consensus_state
×
166
           ~f:State_hash.State_hashes.state_hash )
167
    with
168
    | Error e ->
×
169
        [%log error]
×
170
          ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
171
          !"Could not get the proof of the root transition from the network: \
172
            $error" ;
173
        Deferred.return `Ignored
×
174
    | Ok peer_root_with_proof -> (
×
175
        match%bind
176
          Sync_handler.Root.verify
×
177
            ~context:(module Context)
178
            ~verifier:t.verifier candidate_consensus_state
179
            peer_root_with_proof.data
180
        with
181
        | Ok (`Root root, `Best_tip best_tip) ->
×
182
            if done_syncing_root root_sync_ledger then return `Ignored
×
183
            else
184
              start_sync_job_with_peer ~sender ~root_sync_ledger t best_tip root
×
185
        | Error e ->
×
186
            return (received_bad_proof t sender e |> Fn.const `Ignored) )
×
187

188
(** A helper function that wraps the calls to Sync_ledger and iterate through
189
    incoming transitions, add those to the transition_cache and calls
190
    [on_transition] function. *)
191
let sync_ledger ({ context = (module Context); _ } as t) ~preferred
192
    ~root_sync_ledger ~transition_graph ~sync_ledger_reader =
193
  let open Context in
×
194
  let query_reader = Sync_ledger.Db.query_reader root_sync_ledger in
195
  let response_writer = Sync_ledger.Db.answer_writer root_sync_ledger in
×
196
  Mina_networking.glue_sync_ledger ~preferred t.network query_reader
×
197
    response_writer ;
198
  Reader.iter sync_ledger_reader ~f:(fun (b_or_h, `Valid_cb vc) ->
×
199
      let header_with_hash, sender, transition_cache_element =
×
200
        match b_or_h with
201
        | `Block b_env ->
×
202
            ( Envelope.Incoming.data b_env
×
203
              |> Mina_block.Validation.block_with_hash
×
204
              |> With_hash.map ~f:Mina_block.header
×
205
            , Envelope.Incoming.remote_sender_exn b_env
×
206
            , Envelope.Incoming.map ~f:(fun x -> `Block x) b_env )
×
207
        | `Header h_env ->
×
208
            ( Envelope.Incoming.data h_env
×
209
              |> Mina_block.Validation.header_with_hash
×
210
            , Envelope.Incoming.remote_sender_exn h_env
×
211
            , Envelope.Incoming.map ~f:(fun x -> `Header x) h_env )
×
212
      in
213
      let previous_state_hash =
214
        With_hash.data header_with_hash
×
215
        |> Mina_block.Header.protocol_state
×
216
        |> Protocol_state.previous_state_hash
217
      in
218
      Transition_cache.add transition_graph ~parent:previous_state_hash
×
219
        (transition_cache_element, vc) ;
220
      (* TODO: Efficiently limiting the number of green threads in #1337 *)
221
      if
×
222
        worth_getting_root t
223
          (With_hash.map ~f:to_consensus_state header_with_hash)
×
224
      then (
×
225
        [%log trace] "Added the transition from sync_ledger_reader into cache"
×
226
          ~metadata:
227
            [ ( "state_hash"
228
              , State_hash.to_yojson
×
229
                  (State_hash.With_state_hashes.state_hash header_with_hash) )
×
230
            ; ( "header"
231
              , Mina_block.Header.to_yojson (With_hash.data header_with_hash) )
×
232
            ] ;
233

234
        Deferred.ignore_m
×
235
        @@ on_transition t ~sender ~root_sync_ledger header_with_hash )
×
236
      else Deferred.unit )
×
237

238
let external_transition_compare ~context:(module Context : CONTEXT) =
239
  let module Consensus_context = struct
×
240
    include Context
241

242
    let compile_config = precomputed_values.compile_config
243
  end in
244
  let get_consensus_state =
245
    Fn.compose Protocol_state.consensus_state Mina_block.Header.protocol_state
246
  in
247
  Comparable.lift
×
248
    (fun existing candidate ->
249
      (* To prevent the logger to spam a lot of messsages, the logger input is set to null *)
250
      if
×
251
        State_hash.equal
252
          (State_hash.With_state_hashes.state_hash existing)
×
253
          (State_hash.With_state_hashes.state_hash candidate)
×
254
      then 0
×
255
      else if
×
256
        Consensus.Hooks.equal_select_status `Keep
257
        @@ Consensus.Hooks.select
258
             ~context:(module Consensus_context)
259
             ~existing ~candidate
260
      then -1
×
261
      else 1 )
×
262
    ~f:(With_hash.map ~f:get_consensus_state)
263

264
(** The entry point function for bootstrap controller. When bootstrap finished
265
    it would return a transition frontier with the root breadcrumb and a list
266
    of transitions collected during bootstrap.
267

268
    Bootstrap controller would do the following steps to contrust the
269
    transition frontier:
270
    1. Download the root snarked_ledger.
271
    2. Download the scan state and pending coinbases.
272
    3. Construct the staged ledger from the snarked ledger, scan state and
273
       pending coinbases.
274
    4. Synchronize the consensus local state if necessary.
275
    5. Close the old frontier and reload a new one from disk.
276
 *)
277
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
278
    ~consensus_local_state ~transition_reader ~preferred_peers ~persistent_root
279
    ~persistent_frontier ~initial_root_transition ~catchup_mode =
280
  let open Context in
×
281
  O1trace.thread "bootstrap" (fun () ->
282
      let rec loop previous_cycles =
×
283
        let sync_ledger_pipe = "sync ledger pipe" in
×
284
        let sync_ledger_reader, sync_ledger_writer =
285
          create ~name:sync_ledger_pipe
286
            (Buffered
287
               ( `Capacity 50
288
               , `Overflow
289
                   (Drop_head
290
                      (fun (b_or_h, `Valid_cb valid_cb) ->
291
                        let hash =
×
292
                          match b_or_h with
293
                          | `Block b_env ->
×
294
                              Envelope.Incoming.data b_env
×
295
                              |> Mina_block.Validation.block_with_hash
×
296
                              |> With_hash.hash
×
297
                          | `Header h_env ->
×
298
                              Envelope.Incoming.data h_env
×
299
                              |> Mina_block.Validation.header_with_hash
×
300
                              |> With_hash.hash
×
301
                        in
302
                        Mina_metrics.(
303
                          Counter.inc_one
×
304
                            Pipe.Drop_on_overflow.bootstrap_sync_ledger) ;
305
                        Mina_block.handle_dropped_transition ?valid_cb hash
306
                          ~pipe_name:sync_ledger_pipe ~logger ) ) ) )
307
        in
308
        don't_wait_for
×
309
          (transfer_while_writer_alive transition_reader sync_ledger_writer
×
310
             ~f:Fn.id ) ;
311
        let initial_root_transition =
×
312
          initial_root_transition |> Mina_block.Validated.remember
×
313
          |> Mina_block.Validation.reset_frontier_dependencies_validation
×
314
          |> Mina_block.Validation.reset_staged_ledger_diff_validation
315
        in
316
        let t =
×
317
          { network
318
          ; context = (module Context)
319
          ; trust_system
320
          ; verifier
321
          ; best_seen_transition = initial_root_transition
322
          ; current_root = initial_root_transition
323
          ; num_of_root_snarked_ledger_retargeted = 0
324
          }
325
        in
326
        let transition_graph = Transition_cache.create () in
327
        let temp_persistent_root_instance =
×
328
          Transition_frontier.Persistent_root.create_instance_exn
329
            persistent_root
330
        in
331
        let temp_snarked_ledger =
×
332
          Transition_frontier.Persistent_root.Instance.snarked_ledger
333
            temp_persistent_root_instance
334
        in
335
        (* step 1. download snarked_ledger *)
336
        let module Consensus_context = struct
×
337
          include Context
338

339
          let compile_config = precomputed_values.compile_config
340
        end in
341
        let%bind sync_ledger_time, (hash, sender, expected_staged_ledger_hash) =
342
          time_deferred
×
343
            (let root_sync_ledger =
344
               Sync_ledger.Db.create temp_snarked_ledger
345
                 ~context:(module Consensus_context)
346
                 ~trust_system
347
             in
348
             don't_wait_for
×
349
               (sync_ledger t ~preferred:preferred_peers ~root_sync_ledger
×
350
                  ~transition_graph ~sync_ledger_reader ) ;
351
             (* We ignore the resulting ledger returned here since it will always
352
                * be the same as the ledger we started with because we are syncing
353
                * a db ledger. *)
354
             let%map _, data = Sync_ledger.Db.valid_tree root_sync_ledger in
×
355
             Sync_ledger.Db.destroy root_sync_ledger ;
×
356
             data )
×
357
        in
358
        Mina_metrics.(
×
359
          Counter.inc Bootstrap.root_snarked_ledger_sync_ms
×
360
            Time.Span.(to_ms sync_ledger_time)) ;
×
361
        Mina_metrics.(
362
          Gauge.set Bootstrap.num_of_root_snarked_ledger_retargeted
×
363
            (Float.of_int t.num_of_root_snarked_ledger_retargeted)) ;
×
364
        (* step 2. Download scan state and pending coinbases. *)
365
        let%bind ( staged_ledger_data_download_time
366
                 , staged_ledger_construction_time
367
                 , staged_ledger_aux_result ) =
368
          let%bind ( staged_ledger_data_download_time
369
                   , staged_ledger_data_download_result ) =
370
            time_deferred
×
371
              (Mina_networking
372
               .get_staged_ledger_aux_and_pending_coinbases_at_hash t.network
×
373
                 sender.peer_id hash )
374
          in
375
          match staged_ledger_data_download_result with
×
376
          | Error err ->
×
377
              Deferred.return (staged_ledger_data_download_time, None, Error err)
378
          | Ok
×
379
              ( scan_state
380
              , expected_merkle_root
381
              , pending_coinbases
382
              , protocol_states ) -> (
383
              let%map staged_ledger_construction_result =
384
                O1trace.thread "construct_root_staged_ledger" (fun () ->
×
385
                    let open Deferred.Or_error.Let_syntax in
×
386
                    let received_staged_ledger_hash =
387
                      Staged_ledger_hash.of_aux_ledger_and_coinbase_hash
388
                        (Staged_ledger.Scan_state.hash scan_state)
×
389
                        expected_merkle_root pending_coinbases
390
                    in
391
                    [%log debug]
×
392
                      ~metadata:
393
                        [ ( "expected_staged_ledger_hash"
394
                          , Staged_ledger_hash.to_yojson
×
395
                              expected_staged_ledger_hash )
396
                        ; ( "received_staged_ledger_hash"
397
                          , Staged_ledger_hash.to_yojson
×
398
                              received_staged_ledger_hash )
399
                        ]
400
                      "Comparing $expected_staged_ledger_hash to \
401
                       $received_staged_ledger_hash" ;
402
                    let%bind new_root =
403
                      t.current_root
404
                      |> Mina_block.Validation
405
                         .skip_frontier_dependencies_validation
×
406
                           `This_block_belongs_to_a_detached_subtree
407
                      |> Mina_block.Validation.validate_staged_ledger_hash
×
408
                           (`Staged_ledger_already_materialized
409
                             received_staged_ledger_hash )
410
                      |> Result.map_error ~f:(fun _ ->
×
411
                             Error.of_string
×
412
                               "received faulty scan state from peer" )
413
                      |> Deferred.return
×
414
                    in
415
                    let protocol_states =
×
416
                      List.map protocol_states
417
                        ~f:(With_hash.of_data ~hash_data:Protocol_state.hashes)
418
                    in
419
                    let%bind protocol_states =
420
                      Staged_ledger.Scan_state.check_required_protocol_states
×
421
                        scan_state ~protocol_states
422
                      |> Deferred.return
×
423
                    in
424
                    let protocol_states_map =
×
425
                      protocol_states
426
                      |> List.map ~f:(fun ps ->
×
427
                             (State_hash.With_state_hashes.state_hash ps, ps) )
×
428
                      |> State_hash.Map.of_alist_exn
429
                    in
430
                    let get_state hash =
×
431
                      match Map.find protocol_states_map hash with
×
432
                      | None ->
×
433
                          let new_state_hash =
434
                            State_hash.With_state_hashes.state_hash
435
                              (fst new_root)
×
436
                          in
437
                          [%log error]
×
438
                            ~metadata:
439
                              [ ("new_root", State_hash.to_yojson new_state_hash)
×
440
                              ; ("state_hash", State_hash.to_yojson hash)
×
441
                              ]
442
                            "Protocol state (for scan state transactions) for \
443
                             $state_hash not found when bootstrapping to the \
444
                             new root $new_root" ;
445
                          Or_error.errorf
×
446
                            !"Protocol state (for scan state transactions) for \
×
447
                              %{sexp:State_hash.t} not found when \
448
                              bootstrapping to the new root \
449
                              %{sexp:State_hash.t}"
450
                            hash new_state_hash
451
                      | Some protocol_state ->
×
452
                          Ok (With_hash.data protocol_state)
×
453
                    in
454
                    (* step 3. Construct staged ledger from snarked ledger, scan state
455
                       and pending coinbases. *)
456
                    (* Construct the staged ledger before constructing the transition
457
                     * frontier in order to verify the scan state we received.
458
                     * TODO: reorganize the code to avoid doing this twice (#3480) *)
459
                    let open Deferred.Let_syntax in
460
                    let%map staged_ledger_construction_time, construction_result
461
                        =
462
                      time_deferred
×
463
                        (let open Deferred.Let_syntax in
464
                        let temp_mask =
465
                          Ledger.of_database temp_snarked_ledger
466
                        in
467
                        let%map result =
468
                          Staged_ledger
469
                          .of_scan_state_pending_coinbases_and_snarked_ledger
470
                            ~logger
471
                            ~snarked_local_state:
472
                              Mina_block.(
473
                                t.current_root |> Validation.block |> header
×
474
                                |> Header.protocol_state
×
475
                                |> Protocol_state.blockchain_state
×
476
                                |> Blockchain_state.snarked_local_state)
×
477
                            ~verifier ~constraint_constants ~scan_state
478
                            ~snarked_ledger:temp_mask ~expected_merkle_root
479
                            ~pending_coinbases ~get_state
480
                        in
481
                        ignore
×
482
                          ( Ledger.Maskable.unregister_mask_exn ~loc:__LOC__
×
483
                              temp_mask
484
                            : Ledger.unattached_mask ) ;
485
                        Result.map result
486
                          ~f:
487
                            (const
×
488
                               ( scan_state
489
                               , pending_coinbases
490
                               , new_root
491
                               , protocol_states ) ))
492
                    in
493
                    Ok (staged_ledger_construction_time, construction_result) )
×
494
              in
495
              match staged_ledger_construction_result with
×
496
              | Error err ->
×
497
                  (staged_ledger_data_download_time, None, Error err)
498
              | Ok (staged_ledger_construction_time, result) ->
×
499
                  ( staged_ledger_data_download_time
500
                  , Some staged_ledger_construction_time
501
                  , result ) )
502
        in
503
        Transition_frontier.Persistent_root.Instance.close
×
504
          temp_persistent_root_instance ;
505
        match staged_ledger_aux_result with
×
506
        | Error e ->
×
507
            let%bind () =
508
              Trust_system.(
509
                record t.trust_system logger sender
×
510
                  Actions.
511
                    ( Outgoing_connection_error
512
                    , Some
513
                        ( "Can't find scan state from the peer or received \
514
                           faulty scan state from the peer."
515
                        , [] ) ))
516
            in
517
            [%log error]
×
518
              ~metadata:
519
                [ ("error", Error_json.error_to_yojson e)
×
520
                ; ("state_hash", State_hash.to_yojson hash)
×
521
                ; ( "expected_staged_ledger_hash"
522
                  , Staged_ledger_hash.to_yojson expected_staged_ledger_hash )
×
523
                ]
524
              "Failed to find scan state for the transition with hash \
525
               $state_hash from the peer or received faulty scan state: \
526
               $error. Retry bootstrap" ;
527
            Writer.close sync_ledger_writer ;
×
528
            let this_cycle =
×
529
              { cycle_result = "failed to download and construct scan state"
530
              ; sync_ledger_time
531
              ; staged_ledger_data_download_time
532
              ; staged_ledger_construction_time
533
              ; local_state_sync_required = false
534
              ; local_state_sync_time = None
535
              }
536
            in
537
            loop (this_cycle :: previous_cycles)
538
        | Ok (scan_state, pending_coinbase, new_root, protocol_states) -> (
×
539
            let%bind () =
540
              Trust_system.(
541
                record t.trust_system logger sender
×
542
                  Actions.
543
                    ( Fulfilled_request
544
                    , Some ("Received valid scan state from peer", []) ))
545
            in
546
            let best_seen_block_with_hash, _ = t.best_seen_transition in
×
547
            let consensus_state =
548
              With_hash.data best_seen_block_with_hash
×
549
              |> Mina_block.header |> Mina_block.Header.protocol_state
×
550
              |> Protocol_state.consensus_state
551
            in
552
            (* step 4. Synchronize consensus local state if necessary *)
553
            let%bind ( local_state_sync_time
554
                     , (local_state_sync_required, local_state_sync_result) ) =
555
              time_deferred
×
556
                ( match
557
                    Consensus.Hooks.required_local_state_sync
558
                      ~constants:precomputed_values.consensus_constants
559
                      ~consensus_state ~local_state:consensus_local_state
560
                  with
561
                | None ->
×
562
                    [%log debug]
×
563
                      ~metadata:
564
                        [ ( "local_state"
565
                          , Consensus.Data.Local_state.to_yojson
×
566
                              consensus_local_state )
567
                        ; ( "consensus_state"
568
                          , Consensus.Data.Consensus_state.Value.to_yojson
×
569
                              consensus_state )
570
                        ]
571
                      "Not synchronizing consensus local state" ;
572
                    Deferred.return (false, Or_error.return ())
×
573
                | Some sync_jobs ->
×
574
                    [%log info] "Synchronizing consensus local state" ;
×
575
                    let%map result =
576
                      Consensus.Hooks.sync_local_state
×
577
                        ~context:(module Consensus_context)
578
                        ~local_state:consensus_local_state ~trust_system
579
                        ~glue_sync_ledger:
580
                          (Mina_networking.glue_sync_ledger t.network)
×
581
                        sync_jobs
582
                    in
583
                    (true, result) )
×
584
            in
585
            match local_state_sync_result with
×
586
            | Error e ->
×
587
                [%log error]
×
588
                  ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
589
                  "Local state sync failed: $error. Retry bootstrap" ;
590
                Writer.close sync_ledger_writer ;
×
591
                let this_cycle =
×
592
                  { cycle_result = "failed to synchronize local state"
593
                  ; sync_ledger_time
594
                  ; staged_ledger_data_download_time
595
                  ; staged_ledger_construction_time
596
                  ; local_state_sync_required
597
                  ; local_state_sync_time = Some local_state_sync_time
598
                  }
599
                in
600
                loop (this_cycle :: previous_cycles)
601
            | Ok () ->
×
602
                (* step 5. Close the old frontier and reload a new one from disk. *)
603
                let new_root_data : Transition_frontier.Root_data.Limited.t =
604
                  Transition_frontier.Root_data.Limited.create
605
                    ~transition:(Mina_block.Validated.lift new_root)
×
606
                    ~scan_state ~pending_coinbase ~protocol_states
607
                in
608
                let%bind () =
609
                  Transition_frontier.Persistent_frontier.reset_database_exn
×
610
                    persistent_frontier ~root_data:new_root_data
611
                    ~genesis_state_hash:
612
                      (State_hash.With_state_hashes.state_hash
×
613
                         precomputed_values.protocol_state_with_hashes )
614
                in
615
                (* TODO: lazy load db in persistent root to avoid unecessary opens like this *)
616
                Transition_frontier.Persistent_root.(
×
617
                  with_instance_exn persistent_root ~f:(fun instance ->
×
618
                      Instance.set_root_state_hash instance
×
619
                      @@ Mina_block.Validated.state_hash
×
620
                      @@ Mina_block.Validated.lift new_root )) ;
×
621
                let%map new_frontier =
622
                  let fail msg =
623
                    failwith
×
624
                      ( "failed to initialize transition frontier after \
625
                         bootstrapping: " ^ msg )
626
                  in
627
                  Transition_frontier.load
×
628
                    ~context:(module Consensus_context)
629
                    ~retry_with_fresh_db:false ~verifier ~consensus_local_state
630
                    ~persistent_root ~persistent_frontier ~catchup_mode ()
631
                  >>| function
×
632
                  | Ok frontier ->
×
633
                      frontier
634
                  | Error (`Failure msg) ->
×
635
                      fail msg
636
                  | Error `Bootstrap_required ->
×
637
                      fail
638
                        "bootstrap still required (indicates logical error in \
639
                         code)"
640
                  | Error `Persistent_frontier_malformed ->
×
641
                      fail "persistent frontier was malformed"
642
                  | Error `Snarked_ledger_mismatch ->
×
643
                      fail
644
                        "this should not happen, because we just reset the \
645
                         snarked_ledger"
646
                in
647
                [%str_log info] Bootstrap_complete ;
×
648
                let collected_transitions =
×
649
                  Transition_cache.data transition_graph
650
                in
651
                let logger =
×
652
                  Logger.extend logger
653
                    [ ( "context"
654
                      , `String "Filter collected transitions in bootstrap" )
655
                    ]
656
                in
657
                let root_consensus_state =
×
658
                  Transition_frontier.(
659
                    Breadcrumb.consensus_state_with_hashes (root new_frontier))
×
660
                in
661
                let filtered_collected_transitions =
662
                  List.filter collected_transitions
663
                    ~f:(fun (incoming_transition, _) ->
664
                      let transition =
×
665
                        Envelope.Incoming.data incoming_transition
×
666
                        |> Transition_cache.header_with_hash
667
                      in
668
                      Consensus.Hooks.equal_select_status `Take
×
669
                      @@ Consensus.Hooks.select
670
                           ~context:(module Consensus_context)
671
                           ~existing:root_consensus_state
672
                           ~candidate:
673
                             (With_hash.map
×
674
                                ~f:
675
                                  (Fn.compose Protocol_state.consensus_state
×
676
                                     Mina_block.Header.protocol_state )
677
                                transition ) )
678
                in
679
                [%log debug] "Sorting filtered transitions by consensus state"
×
680
                  ~metadata:[] ;
681
                let sorted_filtered_collected_transitions =
×
682
                  O1trace.sync_thread "sorting_collected_transitions" (fun () ->
683
                      List.sort filtered_collected_transitions
×
684
                        ~compare:
685
                          (Comparable.lift
×
686
                             ~f:(fun (x, _) ->
687
                               Transition_cache.header_with_hash
×
688
                               @@ Envelope.Incoming.data x )
×
689
                             (external_transition_compare
690
                                ~context:(module Context) ) ) )
691
                in
692
                let this_cycle =
×
693
                  { cycle_result = "success"
694
                  ; sync_ledger_time
695
                  ; staged_ledger_data_download_time
696
                  ; staged_ledger_construction_time
697
                  ; local_state_sync_required
698
                  ; local_state_sync_time = Some local_state_sync_time
699
                  }
700
                in
701
                ( this_cycle :: previous_cycles
702
                , (new_frontier, sorted_filtered_collected_transitions) ) )
703
      in
704
      let%map time_elapsed, (cycles, result) = time_deferred (loop []) in
×
705
      [%log info] "Bootstrap completed in $time_elapsed: $bootstrap_stats"
×
706
        ~metadata:
707
          [ ("time_elapsed", time_to_yojson time_elapsed)
×
708
          ; ( "bootstrap_stats"
709
            , `List (List.map ~f:bootstrap_cycle_stats_to_yojson cycles) )
×
710
          ] ;
711
      Mina_metrics.(
×
712
        Gauge.set Bootstrap.bootstrap_time_ms
×
713
          Core.Time.(Span.to_ms @@ time_elapsed)) ;
×
714
      result )
715

716
let%test_module "Bootstrap_controller tests" =
717
  ( module struct
718
    open Pipe_lib
719

720
    let max_frontier_length =
721
      Transition_frontier.global_max_length Genesis_constants.For_unit_tests.t
×
722

723
    let logger = Logger.null ()
×
724

725
    let () =
726
      (* Disable log messages from best_tip_diff logger. *)
727
      Logger.Consumer_registry.register ~commit_id:Mina_version.commit_id
×
728
        ~id:Logger.Logger_id.best_tip_diff ~processor:(Logger.Processor.raw ())
×
729
        ~transport:
730
          (Logger.Transport.create
×
731
             ( module struct
732
               type t = unit
733

734
               let transport () _ = ()
×
735
             end )
736
             () )
737
        ()
738

739
    let trust_system =
740
      let s = Trust_system.null () in
741
      don't_wait_for
×
742
        (Pipe_lib.Strict_pipe.Reader.iter
×
743
           (Trust_system.upcall_pipe s)
×
744
           ~f:(const Deferred.unit) ) ;
×
745
      s
×
746

747
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
748

749
    let proof_level = precomputed_values.proof_level
750

751
    let constraint_constants = precomputed_values.constraint_constants
752

753
    module Context = struct
754
      let logger = logger
755

756
      let precomputed_values = precomputed_values
757

758
      let constraint_constants =
759
        Genesis_constants.For_unit_tests.Constraint_constants.t
760

761
      let consensus_constants = precomputed_values.consensus_constants
762
    end
763

764
    let verifier =
765
      Async.Thread_safe.block_on_async_exn (fun () ->
×
766
          Verifier.For_tests.default ~constraint_constants ~logger ~proof_level
×
767
            () )
768

769
    module Genesis_ledger = (val precomputed_values.genesis_ledger)
770

771
    let downcast_transition ~sender transition =
772
      let transition =
×
773
        transition |> Mina_block.Validated.remember
×
774
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
775
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
776
      in
777
      Envelope.Incoming.wrap ~data:transition
×
778
        ~sender:(Envelope.Sender.Remote sender)
779

780
    let downcast_breadcrumb ~sender breadcrumb =
781
      downcast_transition ~sender
×
782
        (Transition_frontier.Breadcrumb.validated_transition breadcrumb)
×
783

784
    let make_non_running_bootstrap ~genesis_root ~network =
785
      let transition =
×
786
        genesis_root
787
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
788
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
789
      in
790
      { context = (module Context)
×
791
      ; trust_system
792
      ; verifier
793
      ; best_seen_transition = transition
794
      ; current_root = transition
795
      ; network
796
      ; num_of_root_snarked_ledger_retargeted = 0
797
      }
798

799
    let%test_unit "Bootstrap controller caches all transitions it is passed \
800
                   through the transition_reader" =
801
      let branch_size = (max_frontier_length * 2) + 2 in
×
802
      Quickcheck.test ~trials:1
803
        (let open Quickcheck.Generator.Let_syntax in
804
        (* we only need one node for this test, but we need more than one peer so that mina_networking does not throw an error *)
805
        let%bind fake_network =
806
          Fake_network.Generator.(
807
            gen ~precomputed_values ~verifier ~max_frontier_length
×
808
              [ fresh_peer; fresh_peer ] ~use_super_catchup:false)
809
        in
810
        let%map make_branch =
811
          Transition_frontier.Breadcrumb.For_tests.gen_seq ~precomputed_values
×
812
            ~verifier
813
            ~accounts_with_secret_keys:(Lazy.force Genesis_ledger.accounts)
×
814
            branch_size
815
        in
816
        let [ me; _ ] = fake_network.peer_networks in
×
817
        let branch =
818
          Async.Thread_safe.block_on_async_exn (fun () ->
819
              make_branch (Transition_frontier.root me.state.frontier) )
×
820
        in
821
        (fake_network, branch))
×
822
        ~f:(fun (fake_network, branch) ->
823
          let [ me; other ] = fake_network.peer_networks in
×
824
          let genesis_root =
825
            Transition_frontier.(
826
              Breadcrumb.validated_transition @@ root me.state.frontier)
×
827
            |> Mina_block.Validated.remember
828
          in
829
          let transition_graph = Transition_cache.create () in
×
830
          let sync_ledger_reader, sync_ledger_writer =
×
831
            Pipe_lib.Strict_pipe.create ~name:"sync_ledger_reader" Synchronous
832
          in
833
          let bootstrap =
×
834
            make_non_running_bootstrap ~genesis_root ~network:me.network
835
          in
836
          let module Consensus_context = struct
837
            include Context
838

839
            let compile_config = precomputed_values.compile_config
840
          end in
841
          let root_sync_ledger =
842
            Sync_ledger.Db.create
843
              (Transition_frontier.root_snarked_ledger me.state.frontier)
×
844
              ~context:(module Consensus_context)
845
              ~trust_system
846
          in
847
          Async.Thread_safe.block_on_async_exn (fun () ->
×
848
              let sync_deferred =
×
849
                sync_ledger bootstrap ~root_sync_ledger ~transition_graph
850
                  ~preferred:[] ~sync_ledger_reader
851
              in
852
              let%bind () =
853
                Deferred.List.iter branch ~f:(fun breadcrumb ->
×
854
                    Strict_pipe.Writer.write sync_ledger_writer
×
855
                      ( `Block
856
                          (downcast_breadcrumb ~sender:other.peer breadcrumb)
×
857
                      , `Valid_cb None ) )
858
              in
859
              Strict_pipe.Writer.close sync_ledger_writer ;
×
860
              sync_deferred ) ;
×
861
          let expected_transitions =
×
862
            List.map branch
863
              ~f:
864
                (Fn.compose
×
865
                   (With_hash.map ~f:Mina_block.header)
866
                   (Fn.compose Mina_block.Validation.block_with_hash
×
867
                      (Fn.compose Mina_block.Validated.remember
×
868
                         Transition_frontier.Breadcrumb.validated_transition ) ) )
869
          in
870
          let saved_transitions =
×
871
            Transition_cache.data transition_graph
×
872
            |> List.map ~f:(fun (x, _) ->
873
                   Transition_cache.header_with_hash @@ Envelope.Incoming.data x )
×
874
          in
875
          let module E = struct
×
876
            module T = struct
877
              type t = Mina_block.Header.t State_hash.With_state_hashes.t
×
878
              [@@deriving sexp]
879

880
              let compare = external_transition_compare ~context:(module Context)
881
            end
882

883
            include Comparable.Make (T)
884
          end in
885
          [%test_result: E.Set.t]
×
886
            (E.Set.of_list saved_transitions)
×
887
            ~expect:(E.Set.of_list expected_transitions) )
×
888

889
    let run_bootstrap ~timeout_duration ~my_net ~transition_reader =
890
      let open Fake_network in
×
891
      let time_controller = Block_time.Controller.basic ~logger in
892
      let persistent_root =
893
        Transition_frontier.persistent_root my_net.state.frontier
894
      in
895
      let persistent_frontier =
×
896
        Transition_frontier.persistent_frontier my_net.state.frontier
897
      in
898
      let initial_root_transition =
×
899
        Transition_frontier.(
900
          Breadcrumb.validated_transition (root my_net.state.frontier))
×
901
      in
902
      let%bind () =
903
        Transition_frontier.close ~loc:__LOC__ my_net.state.frontier
×
904
      in
905
      [%log info] "bootstrap begin" ;
×
906
      Block_time.Timeout.await_exn time_controller ~timeout_duration
×
907
        (run
908
           ~context:(module Context)
909
           ~trust_system ~verifier ~network:my_net.network ~preferred_peers:[]
910
           ~consensus_local_state:my_net.state.consensus_local_state
911
           ~transition_reader ~persistent_root ~persistent_frontier
912
           ~catchup_mode:`Normal ~initial_root_transition )
913

914
    let assert_transitions_increasingly_sorted ~root
915
        (incoming_transitions : Transition_cache.element list) =
916
      let root =
×
917
        Transition_frontier.Breadcrumb.block root |> Mina_block.header
×
918
      in
919
      ignore
×
920
        ( List.fold_result ~init:root incoming_transitions
×
921
            ~f:(fun max_acc incoming_transition ->
922
              let With_hash.{ data = header; _ } =
×
923
                Transition_cache.header_with_hash
924
                  (Envelope.Incoming.data @@ fst incoming_transition)
×
925
              in
926
              let header_len h =
×
927
                Mina_block.Header.protocol_state h
×
928
                |> Protocol_state.consensus_state
×
929
                |> Consensus.Data.Consensus_state.blockchain_length
930
              in
931
              let open Result.Let_syntax in
932
              let%map () =
933
                Result.ok_if_true
×
934
                  Mina_numbers.Length.(header_len max_acc <= header_len header)
×
935
                  ~error:
936
                    (Error.of_string
×
937
                       "The blocks are not sorted in increasing order" )
938
              in
939
              header )
×
940
          |> Or_error.ok_exn
×
941
          : Mina_block.Header.t )
942

943
    let%test_unit "sync with one node after receiving a transition" =
944
      Quickcheck.test ~trials:1
×
945
        Fake_network.Generator.(
946
          gen ~precomputed_values ~verifier ~max_frontier_length
×
947
            ~use_super_catchup:false
948
            [ fresh_peer
949
            ; peer_with_branch
950
                ~frontier_branch_size:((max_frontier_length * 2) + 2)
951
            ])
952
        ~f:(fun fake_network ->
953
          let [ my_net; peer_net ] = fake_network.peer_networks in
×
954
          let transition_reader, transition_writer =
955
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
956
              (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
957
          in
958
          let block =
×
959
            Envelope.Incoming.wrap
960
              ~data:
961
                ( Transition_frontier.best_tip peer_net.state.frontier
×
962
                |> Transition_frontier.Breadcrumb.validated_transition
×
963
                |> Mina_block.Validated.remember
×
964
                |> Mina_block.Validation.reset_frontier_dependencies_validation
×
965
                |> Mina_block.Validation.reset_staged_ledger_diff_validation )
×
966
              ~sender:(Envelope.Sender.Remote peer_net.peer)
967
          in
968
          Pipe_lib.Strict_pipe.Writer.write transition_writer
969
            (`Block block, `Valid_cb None) ;
970
          let new_frontier, sorted_external_transitions =
×
971
            Async.Thread_safe.block_on_async_exn (fun () ->
972
                run_bootstrap
×
973
                  ~timeout_duration:(Block_time.Span.of_ms 30_000L)
×
974
                  ~my_net ~transition_reader )
975
          in
976
          assert_transitions_increasingly_sorted
×
977
            ~root:(Transition_frontier.root new_frontier)
×
978
            sorted_external_transitions ;
979
          [%test_result: Ledger_hash.t]
×
980
            ( Ledger.Db.merkle_root
×
981
            @@ Transition_frontier.root_snarked_ledger new_frontier )
×
982
            ~expect:
983
              ( Ledger.Db.merkle_root
×
984
              @@ Transition_frontier.root_snarked_ledger peer_net.state.frontier
×
985
              ) )
986

987
    let%test_unit "reconstruct staged_ledgers using \
988
                   of_scan_state_and_snarked_ledger" =
989
      Quickcheck.test ~trials:1
×
990
        (Transition_frontier.For_tests.gen ~precomputed_values ~verifier
×
991
           ~max_length:max_frontier_length ~size:max_frontier_length () )
992
        ~f:(fun frontier ->
993
          Thread_safe.block_on_async_exn
×
994
          @@ fun () ->
995
          Deferred.List.iter (Transition_frontier.all_breadcrumbs frontier)
×
996
            ~f:(fun breadcrumb ->
997
              let staged_ledger =
×
998
                Transition_frontier.Breadcrumb.staged_ledger breadcrumb
999
              in
1000
              let expected_merkle_root =
×
1001
                Staged_ledger.ledger staged_ledger |> Ledger.merkle_root
×
1002
              in
1003
              let snarked_ledger =
×
1004
                Transition_frontier.root_snarked_ledger frontier
×
1005
                |> Ledger.of_database
1006
              in
1007
              let snarked_local_state =
×
1008
                Transition_frontier.root frontier
×
1009
                |> Transition_frontier.Breadcrumb.protocol_state
×
1010
                |> Protocol_state.blockchain_state
×
1011
                |> Blockchain_state.snarked_local_state
1012
              in
1013
              let scan_state = Staged_ledger.scan_state staged_ledger in
×
1014
              let get_state hash =
×
1015
                match Transition_frontier.find_protocol_state frontier hash with
×
1016
                | Some protocol_state ->
×
1017
                    Ok protocol_state
1018
                | None ->
×
1019
                    Or_error.errorf
1020
                      !"Protocol state (for scan state transactions) for \
×
1021
                        %{sexp:State_hash.t} not found"
1022
                      hash
1023
              in
1024
              let pending_coinbases =
1025
                Staged_ledger.pending_coinbase_collection staged_ledger
1026
              in
1027
              let%map actual_staged_ledger =
1028
                Staged_ledger.of_scan_state_pending_coinbases_and_snarked_ledger
1029
                  ~scan_state ~logger ~verifier ~constraint_constants
1030
                  ~snarked_ledger ~snarked_local_state ~expected_merkle_root
1031
                  ~pending_coinbases ~get_state
1032
                |> Deferred.Or_error.ok_exn
×
1033
              in
1034
              assert (
×
1035
                Staged_ledger_hash.equal
×
1036
                  (Staged_ledger.hash staged_ledger)
×
1037
                  (Staged_ledger.hash actual_staged_ledger) ) ) )
×
1038

1039
    (*
1040
    let%test_unit "if we see a new transition that is better than the \
1041
                   transition that we are syncing from, than we should \
1042
                   retarget our root" =
1043
      Quickcheck.test ~trials:1
1044
        Fake_network.Generator.(
1045
          gen ~max_frontier_length
1046
            [ fresh_peer
1047
            ; peer_with_branch ~frontier_branch_size:max_frontier_length
1048
            ; peer_with_branch
1049
                ~frontier_branch_size:((max_frontier_length * 2) + 2) ])
1050
        ~f:(fun fake_network ->
1051
          let [me; weaker_chain; stronger_chain] =
1052
            fake_network.peer_networks
1053
          in
1054
          let transition_reader, transition_writer =
1055
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
1056
              (Buffered (`Capacity 10, `Overflow Drop_head))
1057
          in
1058
          Envelope.Incoming.wrap
1059
            ~data:
1060
              ( Transition_frontier.best_tip weaker_chain.state.frontier
1061
              |> Transition_frontier.Breadcrumb.validated_transition
1062
              |> Mina_block.Validated.to_initial_validated )
1063
            ~sender:
1064
              (Envelope.Sender.Remote
1065
                 (weaker_chain.peer.host, weaker_chain.peer.peer_id))
1066
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1067
          Envelope.Incoming.wrap
1068
            ~data:
1069
              ( Transition_frontier.best_tip stronger_chain.state.frontier
1070
              |> Transition_frontier.Breadcrumb.validated_transition
1071
              |> Mina_block.Validated.to_initial_validated )
1072
            ~sender:
1073
              (Envelope.Sender.Remote
1074
                 (stronger_chain.peer.host, stronger_chain.peer.peer_id))
1075
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1076
          let new_frontier, sorted_external_transitions =
1077
            Async.Thread_safe.block_on_async_exn (fun () ->
1078
                run_bootstrap
1079
                  ~timeout_duration:(Block_time.Span.of_ms 60_000L)
1080
                  ~my_net:me ~transition_reader )
1081
          in
1082
          assert_transitions_increasingly_sorted
1083
            ~root:(Transition_frontier.root new_frontier)
1084
            sorted_external_transitions ;
1085
          [%test_result: Ledger_hash.t]
1086
            ( Ledger.Db.merkle_root
1087
            @@ Transition_frontier.root_snarked_ledger new_frontier )
1088
            ~expect:
1089
              ( Ledger.Db.merkle_root
1090
              @@ Transition_frontier.root_snarked_ledger
1091
                   stronger_chain.state.frontier ) )
1092
*)
1093
  end )
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc