• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 3551

31 Mar 2025 07:46PM UTC coverage: 36.547% (-24.3%) from 60.798%
3551

push

buildkite

web-flow
Merge pull request #16802 from MinaProtocol/dw/remove-sprs-plonk-wasm

plonk-wasm: remove sprs

26250 of 71825 relevant lines covered (36.55%)

49605.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.81
/src/lib/bootstrap_controller/bootstrap_controller.ml
1
(* Only show stdout for failed inline tests. *)
14✔
2
open Core
3
open Async
4
open Mina_base
5
module Ledger = Mina_ledger.Ledger
6
module Sync_ledger = Mina_ledger.Sync_ledger
7
open Mina_state
8
open Pipe_lib.Strict_pipe
9
open Network_peer
10
module Transition_cache = Transition_cache
11

12
module type CONTEXT = sig
13
  val logger : Logger.t
14

15
  val precomputed_values : Precomputed_values.t
16

17
  val constraint_constants : Genesis_constants.Constraint_constants.t
18

19
  val consensus_constants : Consensus.Constants.t
20

21
  val ledger_sync_config : Syncable_ledger.daemon_config
22

23
  val proof_cache_db : Proof_cache_tag.cache_db
24
end
25

26
type Structured_log_events.t += Bootstrap_complete
27
  [@@deriving register_event { msg = "Bootstrap state: complete." }]
3✔
28

29
type t =
30
  { context : (module CONTEXT)
31
  ; trust_system : Trust_system.t
32
  ; verifier : Verifier.t
33
  ; mutable best_seen_transition : Mina_block.initial_valid_block
34
  ; mutable current_root : Mina_block.initial_valid_block
35
  ; network : Mina_networking.t
36
  ; mutable num_of_root_snarked_ledger_retargeted : int
37
  }
38

39
type time = Time.Span.t
40

41
let time_to_yojson span =
42
  `String (Printf.sprintf "%f seconds" (Time.Span.to_sec span))
×
43

44
type opt_time = time option
45

46
let opt_time_to_yojson = function
47
  | Some time ->
×
48
      time_to_yojson time
49
  | None ->
×
50
      `Null
51

52
(** An auxiliary data structure for collecting various metrics for boostrap controller. *)
53
type bootstrap_cycle_stats =
×
54
  { cycle_result : string
×
55
  ; sync_ledger_time : time
×
56
  ; staged_ledger_data_download_time : time
×
57
  ; staged_ledger_construction_time : opt_time
×
58
  ; local_state_sync_required : bool
×
59
  ; local_state_sync_time : opt_time
×
60
  }
61
[@@deriving to_yojson]
62

63
let time_deferred deferred =
64
  let start_time = Time.now () in
×
65
  let%map result = deferred in
66
  let end_time = Time.now () in
×
67
  (Time.diff end_time start_time, result)
×
68

69
let worth_getting_root ({ context = (module Context); _ } as t) candidate =
70
  let module Consensus_context = struct
×
71
    include Context
72

73
    let logger =
74
      Logger.extend logger
×
75
        [ ( "selection_context"
76
          , `String "Bootstrap_controller.worth_getting_root" )
77
        ]
78
  end in
79
  Consensus.Hooks.equal_select_status `Take
80
  @@ Consensus.Hooks.select
81
       ~context:(module Consensus_context)
82
       ~existing:
83
         ( t.best_seen_transition |> Mina_block.Validation.block_with_hash
×
84
         |> With_hash.map ~f:Mina_block.consensus_state )
×
85
       ~candidate
86

87
let received_bad_proof ({ context = (module Context); _ } as t) host e =
88
  let open Context in
×
89
  Trust_system.(
90
    record t.trust_system logger host
91
      Actions.
92
        ( Violated_protocol
93
        , Some
94
            ( "Bad ancestor proof: $error"
95
            , [ ("error", Error_json.error_to_yojson e) ] ) ))
×
96

97
let done_syncing_root root_sync_ledger =
98
  Option.is_some (Sync_ledger.Db.peek_valid_tree root_sync_ledger)
×
99

100
let should_sync ~root_sync_ledger t candidate_state =
101
  (not @@ done_syncing_root root_sync_ledger)
×
102
  && worth_getting_root t candidate_state
×
103

104
(** Update [Synced_ledger]'s target and [best_seen_transition] and [current_root] accordingly. *)
105
let start_sync_job_with_peer ~sender ~root_sync_ledger
106
    ({ context = (module Context); _ } as t) peer_best_tip peer_root =
107
  let open Context in
×
108
  let%bind () =
109
    Trust_system.(
110
      record t.trust_system logger sender
×
111
        Actions.
112
          ( Fulfilled_request
113
          , Some ("Received verified peer root and best tip", []) ))
114
  in
115
  t.best_seen_transition <- peer_best_tip ;
×
116
  t.current_root <- peer_root ;
117
  let blockchain_state =
118
    t.current_root |> Mina_block.Validation.block |> Mina_block.header
×
119
    |> Mina_block.Header.protocol_state |> Protocol_state.blockchain_state
×
120
  in
121
  let expected_staged_ledger_hash =
×
122
    blockchain_state |> Blockchain_state.staged_ledger_hash
123
  in
124
  let snarked_ledger_hash =
×
125
    blockchain_state |> Blockchain_state.snarked_ledger_hash
126
  in
127
  return
×
128
  @@
129
  match
130
    Sync_ledger.Db.new_goal root_sync_ledger
131
      (Frozen_ledger_hash.to_ledger_hash snarked_ledger_hash)
×
132
      ~data:
133
        ( State_hash.With_state_hashes.state_hash
×
134
          @@ Mina_block.Validation.block_with_hash t.current_root
×
135
        , sender
136
        , expected_staged_ledger_hash )
137
      ~equal:(fun (hash1, _, _) (hash2, _, _) -> State_hash.equal hash1 hash2)
×
138
  with
139
  | `New ->
×
140
      t.num_of_root_snarked_ledger_retargeted <-
141
        t.num_of_root_snarked_ledger_retargeted + 1 ;
142
      `Syncing_new_snarked_ledger
143
  | `Update_data ->
×
144
      `Updating_root_transition
145
  | `Repeat ->
×
146
      `Ignored
147

148
let to_consensus_state h =
149
  Mina_block.Header.protocol_state h |> Protocol_state.consensus_state
×
150

151
(** For each transition, this function would compare it with the existing one.
152
    If the incoming transition is better, then download the merkle list from
153
    that transition to its root and verify it. If we get a better root than
154
    the existing one, then reset the Sync_ledger's target by calling
155
    [start_sync_job_with_peer] function. *)
156
let on_transition ({ context = (module Context); _ } as t) ~sender
157
    ~root_sync_ledger candidate_header =
158
  let open Context in
×
159
  let candidate_consensus_state =
160
    With_hash.map ~f:to_consensus_state candidate_header
161
  in
162
  if not @@ should_sync ~root_sync_ledger t candidate_consensus_state then
×
163
    Deferred.return `Ignored
×
164
  else
165
    match%bind
166
      Mina_networking.get_ancestry t.network sender.Peer.peer_id
×
167
        (With_hash.map_hash candidate_consensus_state
×
168
           ~f:State_hash.State_hashes.state_hash )
169
    with
170
    | Error e ->
×
171
        [%log error]
×
172
          ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
173
          !"Could not get the proof of the root transition from the network: \
174
            $error" ;
175
        Deferred.return `Ignored
×
176
    | Ok peer_root_with_proof -> (
×
177
        match%bind
178
          Mina_block.verify_on_header
×
179
            ~verify:
180
              (Sync_handler.Root.verify
×
181
                 ~context:(module Context)
182
                 ~verifier:t.verifier candidate_consensus_state )
183
            peer_root_with_proof.data
184
        with
185
        | Ok (`Root root, `Best_tip best_tip) ->
×
186
            if done_syncing_root root_sync_ledger then return `Ignored
×
187
            else
188
              start_sync_job_with_peer ~sender ~root_sync_ledger t best_tip root
×
189
        | Error e ->
×
190
            return (received_bad_proof t sender e |> Fn.const `Ignored) )
×
191

192
(** A helper function that wraps the calls to Sync_ledger and iterate through
193
    incoming transitions, add those to the transition_cache and calls
194
    [on_transition] function. *)
195
let sync_ledger ({ context = (module Context); _ } as t) ~preferred
196
    ~root_sync_ledger ~transition_graph ~sync_ledger_reader =
197
  let open Context in
×
198
  let query_reader = Sync_ledger.Db.query_reader root_sync_ledger in
199
  let response_writer = Sync_ledger.Db.answer_writer root_sync_ledger in
×
200
  Mina_networking.glue_sync_ledger ~preferred t.network query_reader
×
201
    response_writer ;
202
  Reader.iter sync_ledger_reader ~f:(fun (b_or_h, `Valid_cb vc) ->
×
203
      let header_with_hash, sender, transition_cache_element =
×
204
        match b_or_h with
205
        | `Block b_env ->
×
206
            ( Envelope.Incoming.data b_env
×
207
              |> Mina_block.Validation.block_with_hash
×
208
              |> With_hash.map ~f:Mina_block.header
×
209
            , Envelope.Incoming.remote_sender_exn b_env
×
210
            , Envelope.Incoming.map ~f:(fun x -> `Block x) b_env )
×
211
        | `Header h_env ->
×
212
            ( Envelope.Incoming.data h_env
×
213
              |> Mina_block.Validation.header_with_hash
×
214
            , Envelope.Incoming.remote_sender_exn h_env
×
215
            , Envelope.Incoming.map ~f:(fun x -> `Header x) h_env )
×
216
      in
217
      let previous_state_hash =
218
        With_hash.data header_with_hash
×
219
        |> Mina_block.Header.protocol_state
×
220
        |> Protocol_state.previous_state_hash
221
      in
222
      Transition_cache.add transition_graph ~parent:previous_state_hash
×
223
        (transition_cache_element, vc) ;
224
      (* TODO: Efficiently limiting the number of green threads in #1337 *)
225
      if
×
226
        worth_getting_root t
227
          (With_hash.map ~f:to_consensus_state header_with_hash)
×
228
      then (
×
229
        [%log trace] "Added the transition from sync_ledger_reader into cache"
×
230
          ~metadata:
231
            [ ( "state_hash"
232
              , State_hash.to_yojson
×
233
                  (State_hash.With_state_hashes.state_hash header_with_hash) )
×
234
            ; ( "header"
235
              , Mina_block.Header.to_yojson (With_hash.data header_with_hash) )
×
236
            ] ;
237

238
        Deferred.ignore_m
×
239
        @@ on_transition t ~sender ~root_sync_ledger header_with_hash )
×
240
      else Deferred.unit )
×
241

242
let external_transition_compare ~context:(module Context : CONTEXT) =
243
  let get_consensus_state =
×
244
    Fn.compose Protocol_state.consensus_state Mina_block.Header.protocol_state
245
  in
246
  Comparable.lift
×
247
    (fun existing candidate ->
248
      (* To prevent the logger to spam a lot of messsages, the logger input is set to null *)
249
      if
×
250
        State_hash.equal
251
          (State_hash.With_state_hashes.state_hash existing)
×
252
          (State_hash.With_state_hashes.state_hash candidate)
×
253
      then 0
×
254
      else if
×
255
        Consensus.Hooks.equal_select_status `Keep
256
        @@ Consensus.Hooks.select ~context:(module Context) ~existing ~candidate
257
      then -1
×
258
      else 1 )
×
259
    ~f:(With_hash.map ~f:get_consensus_state)
260

261
(** The entry point function for bootstrap controller. When bootstrap finished
262
    it would return a transition frontier with the root breadcrumb and a list
263
    of transitions collected during bootstrap.
264

265
    Bootstrap controller would do the following steps to contrust the
266
    transition frontier:
267
    1. Download the root snarked_ledger.
268
    2. Download the scan state and pending coinbases.
269
    3. Construct the staged ledger from the snarked ledger, scan state and
270
       pending coinbases.
271
    4. Synchronize the consensus local state if necessary.
272
    5. Close the old frontier and reload a new one from disk.
273
 *)
274
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
275
    ~consensus_local_state ~transition_reader ~preferred_peers ~persistent_root
276
    ~persistent_frontier ~initial_root_transition ~catchup_mode =
277
  let open Context in
×
278
  O1trace.thread "bootstrap" (fun () ->
279
      let rec loop previous_cycles =
×
280
        let sync_ledger_pipe = "sync ledger pipe" in
×
281
        let sync_ledger_reader, sync_ledger_writer =
282
          create ~name:sync_ledger_pipe
283
            (Buffered
284
               ( `Capacity 50
285
               , `Overflow
286
                   (Drop_head
287
                      (fun (b_or_h, `Valid_cb valid_cb) ->
288
                        let hash =
×
289
                          match b_or_h with
290
                          | `Block b_env ->
×
291
                              Envelope.Incoming.data b_env
×
292
                              |> Mina_block.Validation.block_with_hash
×
293
                              |> With_hash.hash
×
294
                          | `Header h_env ->
×
295
                              Envelope.Incoming.data h_env
×
296
                              |> Mina_block.Validation.header_with_hash
×
297
                              |> With_hash.hash
×
298
                        in
299
                        Mina_metrics.(
300
                          Counter.inc_one
×
301
                            Pipe.Drop_on_overflow.bootstrap_sync_ledger) ;
302
                        Mina_block.handle_dropped_transition ?valid_cb hash
303
                          ~pipe_name:sync_ledger_pipe ~logger ) ) ) )
304
        in
305
        don't_wait_for
×
306
          (transfer_while_writer_alive transition_reader sync_ledger_writer
×
307
             ~f:Fn.id ) ;
308
        let initial_root_transition =
×
309
          initial_root_transition |> Mina_block.Validated.remember
×
310
          |> Mina_block.Validation.reset_frontier_dependencies_validation
×
311
          |> Mina_block.Validation.reset_staged_ledger_diff_validation
312
        in
313
        let t =
×
314
          { network
315
          ; context = (module Context)
316
          ; trust_system
317
          ; verifier
318
          ; best_seen_transition = initial_root_transition
319
          ; current_root = initial_root_transition
320
          ; num_of_root_snarked_ledger_retargeted = 0
321
          }
322
        in
323
        let transition_graph = Transition_cache.create () in
324
        let temp_persistent_root_instance =
×
325
          Transition_frontier.Persistent_root.create_instance_exn
326
            persistent_root
327
        in
328
        let temp_snarked_ledger =
×
329
          Transition_frontier.Persistent_root.Instance.snarked_ledger
330
            temp_persistent_root_instance
331
        in
332
        (* step 1. download snarked_ledger *)
333
        let%bind sync_ledger_time, (hash, sender, expected_staged_ledger_hash) =
334
          time_deferred
×
335
            (let root_sync_ledger =
336
               Sync_ledger.Db.create temp_snarked_ledger
337
                 ~context:(module Context)
338
                 ~trust_system
339
             in
340
             don't_wait_for
×
341
               (sync_ledger t ~preferred:preferred_peers ~root_sync_ledger
×
342
                  ~transition_graph ~sync_ledger_reader ) ;
343
             (* We ignore the resulting ledger returned here since it will always
344
                * be the same as the ledger we started with because we are syncing
345
                * a db ledger. *)
346
             let%map _, data = Sync_ledger.Db.valid_tree root_sync_ledger in
×
347
             Sync_ledger.Db.destroy root_sync_ledger ;
×
348
             data )
×
349
        in
350
        Mina_metrics.(
×
351
          Counter.inc Bootstrap.root_snarked_ledger_sync_ms
×
352
            Time.Span.(to_ms sync_ledger_time)) ;
×
353
        Mina_metrics.(
354
          Gauge.set Bootstrap.num_of_root_snarked_ledger_retargeted
×
355
            (Float.of_int t.num_of_root_snarked_ledger_retargeted)) ;
×
356
        (* step 2. Download scan state and pending coinbases. *)
357
        let%bind ( staged_ledger_data_download_time
358
                 , staged_ledger_construction_time
359
                 , staged_ledger_aux_result ) =
360
          let%bind ( staged_ledger_data_download_time
361
                   , staged_ledger_data_download_result ) =
362
            time_deferred
×
363
              (Mina_networking
364
               .get_staged_ledger_aux_and_pending_coinbases_at_hash t.network
×
365
                 sender.peer_id hash )
366
          in
367
          match staged_ledger_data_download_result with
×
368
          | Error err ->
×
369
              Deferred.return (staged_ledger_data_download_time, None, Error err)
370
          | Ok
×
371
              ( scan_state_uncached
372
              , expected_merkle_root
373
              , pending_coinbases
374
              , protocol_states ) -> (
375
              let%map staged_ledger_construction_result =
376
                O1trace.thread "construct_root_staged_ledger" (fun () ->
×
377
                    let open Deferred.Or_error.Let_syntax in
×
378
                    let received_staged_ledger_hash =
379
                      Staged_ledger_hash.of_aux_ledger_and_coinbase_hash
380
                        (Staged_ledger.Scan_state.Stable.Latest.hash
×
381
                           scan_state_uncached )
382
                        expected_merkle_root pending_coinbases
383
                    in
384
                    [%log debug]
×
385
                      ~metadata:
386
                        [ ( "expected_staged_ledger_hash"
387
                          , Staged_ledger_hash.to_yojson
×
388
                              expected_staged_ledger_hash )
389
                        ; ( "received_staged_ledger_hash"
390
                          , Staged_ledger_hash.to_yojson
×
391
                              received_staged_ledger_hash )
392
                        ]
393
                      "Comparing $expected_staged_ledger_hash to \
394
                       $received_staged_ledger_hash" ;
395
                    let%bind new_root =
396
                      t.current_root
397
                      |> Mina_block.Validation
398
                         .skip_frontier_dependencies_validation
×
399
                           `This_block_belongs_to_a_detached_subtree
400
                      |> Mina_block.Validation.validate_staged_ledger_hash
×
401
                           (`Staged_ledger_already_materialized
402
                             received_staged_ledger_hash )
403
                      |> Result.map_error ~f:(fun _ ->
×
404
                             Error.of_string
×
405
                               "received faulty scan state from peer" )
406
                      |> Deferred.return
×
407
                    in
408
                    let protocol_states =
×
409
                      List.map protocol_states
410
                        ~f:(With_hash.of_data ~hash_data:Protocol_state.hashes)
411
                    in
412
                    let scan_state =
×
413
                      Staged_ledger.Scan_state.write_all_proofs_to_disk
414
                        ~proof_cache_db scan_state_uncached
415
                    in
416
                    let%bind protocol_states =
417
                      Staged_ledger.Scan_state.check_required_protocol_states
×
418
                        scan_state ~protocol_states
419
                      |> Deferred.return
×
420
                    in
421
                    let protocol_states_map =
×
422
                      protocol_states
423
                      |> List.map ~f:(fun ps ->
×
424
                             (State_hash.With_state_hashes.state_hash ps, ps) )
×
425
                      |> State_hash.Map.of_alist_exn
426
                    in
427
                    let get_state hash =
×
428
                      match Map.find protocol_states_map hash with
×
429
                      | None ->
×
430
                          let new_state_hash =
431
                            State_hash.With_state_hashes.state_hash
432
                              (fst new_root)
×
433
                          in
434
                          [%log error]
×
435
                            ~metadata:
436
                              [ ("new_root", State_hash.to_yojson new_state_hash)
×
437
                              ; ("state_hash", State_hash.to_yojson hash)
×
438
                              ]
439
                            "Protocol state (for scan state transactions) for \
440
                             $state_hash not found when bootstrapping to the \
441
                             new root $new_root" ;
442
                          Or_error.errorf
×
443
                            !"Protocol state (for scan state transactions) for \
×
444
                              %{sexp:State_hash.t} not found when \
445
                              bootstrapping to the new root \
446
                              %{sexp:State_hash.t}"
447
                            hash new_state_hash
448
                      | Some protocol_state ->
×
449
                          Ok (With_hash.data protocol_state)
×
450
                    in
451
                    (* step 3. Construct staged ledger from snarked ledger, scan state
452
                       and pending coinbases. *)
453
                    (* Construct the staged ledger before constructing the transition
454
                     * frontier in order to verify the scan state we received.
455
                     * TODO: reorganize the code to avoid doing this twice (#3480) *)
456
                    let open Deferred.Let_syntax in
457
                    let%map staged_ledger_construction_time, construction_result
458
                        =
459
                      time_deferred
×
460
                        (let open Deferred.Let_syntax in
461
                        let temp_mask =
462
                          Ledger.of_database temp_snarked_ledger
463
                        in
464
                        let%map result =
465
                          Staged_ledger
466
                          .of_scan_state_pending_coinbases_and_snarked_ledger
467
                            ~logger
468
                            ~snarked_local_state:
469
                              Mina_block.(
470
                                t.current_root |> Validation.block |> header
×
471
                                |> Header.protocol_state
×
472
                                |> Protocol_state.blockchain_state
×
473
                                |> Blockchain_state.snarked_local_state)
×
474
                            ~verifier ~constraint_constants ~scan_state
475
                            ~snarked_ledger:temp_mask ~expected_merkle_root
476
                            ~pending_coinbases ~get_state
477
                        in
478
                        ignore
×
479
                          ( Ledger.Maskable.unregister_mask_exn ~loc:__LOC__
×
480
                              temp_mask
481
                            : Ledger.unattached_mask ) ;
482
                        Result.map result
483
                          ~f:
484
                            (const
×
485
                               ( scan_state
486
                               , pending_coinbases
487
                               , new_root
488
                               , protocol_states ) ))
489
                    in
490
                    Ok (staged_ledger_construction_time, construction_result) )
×
491
              in
492
              match staged_ledger_construction_result with
×
493
              | Error err ->
×
494
                  (staged_ledger_data_download_time, None, Error err)
495
              | Ok (staged_ledger_construction_time, result) ->
×
496
                  ( staged_ledger_data_download_time
497
                  , Some staged_ledger_construction_time
498
                  , result ) )
499
        in
500
        Transition_frontier.Persistent_root.Instance.close
×
501
          temp_persistent_root_instance ;
502
        match staged_ledger_aux_result with
×
503
        | Error e ->
×
504
            let%bind () =
505
              Trust_system.(
506
                record t.trust_system logger sender
×
507
                  Actions.
508
                    ( Outgoing_connection_error
509
                    , Some
510
                        ( "Can't find scan state from the peer or received \
511
                           faulty scan state from the peer."
512
                        , [] ) ))
513
            in
514
            [%log error]
×
515
              ~metadata:
516
                [ ("error", Error_json.error_to_yojson e)
×
517
                ; ("state_hash", State_hash.to_yojson hash)
×
518
                ; ( "expected_staged_ledger_hash"
519
                  , Staged_ledger_hash.to_yojson expected_staged_ledger_hash )
×
520
                ]
521
              "Failed to find scan state for the transition with hash \
522
               $state_hash from the peer or received faulty scan state: \
523
               $error. Retry bootstrap" ;
524
            Writer.close sync_ledger_writer ;
×
525
            let this_cycle =
×
526
              { cycle_result = "failed to download and construct scan state"
527
              ; sync_ledger_time
528
              ; staged_ledger_data_download_time
529
              ; staged_ledger_construction_time
530
              ; local_state_sync_required = false
531
              ; local_state_sync_time = None
532
              }
533
            in
534
            loop (this_cycle :: previous_cycles)
535
        | Ok (scan_state, pending_coinbase, new_root, protocol_states) -> (
×
536
            let%bind () =
537
              Trust_system.(
538
                record t.trust_system logger sender
×
539
                  Actions.
540
                    ( Fulfilled_request
541
                    , Some ("Received valid scan state from peer", []) ))
542
            in
543
            let best_seen_block_with_hash, _ = t.best_seen_transition in
×
544
            let consensus_state =
545
              With_hash.data best_seen_block_with_hash
×
546
              |> Mina_block.header |> Mina_block.Header.protocol_state
×
547
              |> Protocol_state.consensus_state
548
            in
549
            (* step 4. Synchronize consensus local state if necessary *)
550
            let%bind ( local_state_sync_time
551
                     , (local_state_sync_required, local_state_sync_result) ) =
552
              time_deferred
×
553
                ( match
554
                    Consensus.Hooks.required_local_state_sync
555
                      ~constants:precomputed_values.consensus_constants
556
                      ~consensus_state ~local_state:consensus_local_state
557
                  with
558
                | None ->
×
559
                    [%log debug]
×
560
                      ~metadata:
561
                        [ ( "local_state"
562
                          , Consensus.Data.Local_state.to_yojson
×
563
                              consensus_local_state )
564
                        ; ( "consensus_state"
565
                          , Consensus.Data.Consensus_state.Value.to_yojson
×
566
                              consensus_state )
567
                        ]
568
                      "Not synchronizing consensus local state" ;
569
                    Deferred.return (false, Or_error.return ())
×
570
                | Some sync_jobs ->
×
571
                    [%log info] "Synchronizing consensus local state" ;
×
572
                    let%map result =
573
                      Consensus.Hooks.sync_local_state
×
574
                        ~context:(module Context)
575
                        ~local_state:consensus_local_state ~trust_system
576
                        ~glue_sync_ledger:
577
                          (Mina_networking.glue_sync_ledger t.network)
×
578
                        sync_jobs
579
                    in
580
                    (true, result) )
×
581
            in
582
            match local_state_sync_result with
×
583
            | Error e ->
×
584
                [%log error]
×
585
                  ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
586
                  "Local state sync failed: $error. Retry bootstrap" ;
587
                Writer.close sync_ledger_writer ;
×
588
                let this_cycle =
×
589
                  { cycle_result = "failed to synchronize local state"
590
                  ; sync_ledger_time
591
                  ; staged_ledger_data_download_time
592
                  ; staged_ledger_construction_time
593
                  ; local_state_sync_required
594
                  ; local_state_sync_time = Some local_state_sync_time
595
                  }
596
                in
597
                loop (this_cycle :: previous_cycles)
598
            | Ok () ->
×
599
                (* step 5. Close the old frontier and reload a new one from disk. *)
600
                let new_root_data : Transition_frontier.Root_data.Limited.t =
601
                  Transition_frontier.Root_data.Limited.create
602
                    ~transition:(Mina_block.Validated.lift new_root)
×
603
                    ~scan_state ~pending_coinbase ~protocol_states
604
                in
605
                let%bind () =
606
                  Transition_frontier.Persistent_frontier.reset_database_exn
×
607
                    persistent_frontier ~root_data:new_root_data
608
                    ~genesis_state_hash:
609
                      (State_hash.With_state_hashes.state_hash
×
610
                         precomputed_values.protocol_state_with_hashes )
611
                in
612
                (* TODO: lazy load db in persistent root to avoid unecessary opens like this *)
613
                Transition_frontier.Persistent_root.(
×
614
                  with_instance_exn persistent_root ~f:(fun instance ->
×
615
                      Instance.set_root_state_hash instance
×
616
                      @@ Mina_block.Validated.state_hash
×
617
                      @@ Mina_block.Validated.lift new_root )) ;
×
618
                let%map new_frontier =
619
                  let fail msg =
620
                    failwith
×
621
                      ( "failed to initialize transition frontier after \
622
                         bootstrapping: " ^ msg )
623
                  in
624
                  Transition_frontier.load
×
625
                    ~context:(module Context)
626
                    ~retry_with_fresh_db:false ~verifier ~consensus_local_state
627
                    ~persistent_root ~persistent_frontier ~catchup_mode ()
628
                  >>| function
×
629
                  | Ok frontier ->
×
630
                      frontier
631
                  | Error (`Failure msg) ->
×
632
                      fail msg
633
                  | Error `Bootstrap_required ->
×
634
                      fail
635
                        "bootstrap still required (indicates logical error in \
636
                         code)"
637
                  | Error `Persistent_frontier_malformed ->
×
638
                      fail "persistent frontier was malformed"
639
                  | Error `Snarked_ledger_mismatch ->
×
640
                      fail
641
                        "this should not happen, because we just reset the \
642
                         snarked_ledger"
643
                in
644
                [%str_log info] Bootstrap_complete ;
×
645
                let collected_transitions =
×
646
                  Transition_cache.data transition_graph
647
                in
648
                let logger =
×
649
                  Logger.extend logger
650
                    [ ( "context"
651
                      , `String "Filter collected transitions in bootstrap" )
652
                    ]
653
                in
654
                let root_consensus_state =
×
655
                  Transition_frontier.(
656
                    Breadcrumb.consensus_state_with_hashes (root new_frontier))
×
657
                in
658
                let filtered_collected_transitions =
659
                  List.filter collected_transitions
660
                    ~f:(fun (incoming_transition, _) ->
661
                      let transition =
×
662
                        Envelope.Incoming.data incoming_transition
×
663
                        |> Transition_cache.header_with_hash
664
                      in
665
                      Consensus.Hooks.equal_select_status `Take
×
666
                      @@ Consensus.Hooks.select
667
                           ~context:(module Context)
668
                           ~existing:root_consensus_state
669
                           ~candidate:
670
                             (With_hash.map
×
671
                                ~f:
672
                                  (Fn.compose Protocol_state.consensus_state
×
673
                                     Mina_block.Header.protocol_state )
674
                                transition ) )
675
                in
676
                [%log debug] "Sorting filtered transitions by consensus state"
×
677
                  ~metadata:[] ;
678
                let sorted_filtered_collected_transitions =
×
679
                  O1trace.sync_thread "sorting_collected_transitions" (fun () ->
680
                      List.sort filtered_collected_transitions
×
681
                        ~compare:
682
                          (Comparable.lift
×
683
                             ~f:(fun (x, _) ->
684
                               Transition_cache.header_with_hash
×
685
                               @@ Envelope.Incoming.data x )
×
686
                             (external_transition_compare
687
                                ~context:(module Context) ) ) )
688
                in
689
                let this_cycle =
×
690
                  { cycle_result = "success"
691
                  ; sync_ledger_time
692
                  ; staged_ledger_data_download_time
693
                  ; staged_ledger_construction_time
694
                  ; local_state_sync_required
695
                  ; local_state_sync_time = Some local_state_sync_time
696
                  }
697
                in
698
                ( this_cycle :: previous_cycles
699
                , (new_frontier, sorted_filtered_collected_transitions) ) )
700
      in
701
      let%map time_elapsed, (cycles, result) = time_deferred (loop []) in
×
702
      [%log info] "Bootstrap completed in $time_elapsed: $bootstrap_stats"
×
703
        ~metadata:
704
          [ ("time_elapsed", time_to_yojson time_elapsed)
×
705
          ; ( "bootstrap_stats"
706
            , `List (List.map ~f:bootstrap_cycle_stats_to_yojson cycles) )
×
707
          ] ;
708
      Mina_metrics.(
×
709
        Gauge.set Bootstrap.bootstrap_time_ms
×
710
          Core.Time.(Span.to_ms @@ time_elapsed)) ;
×
711
      result )
712

713
let%test_module "Bootstrap_controller tests" =
714
  ( module struct
715
    open Pipe_lib
716

717
    let max_frontier_length =
718
      Transition_frontier.global_max_length Genesis_constants.For_unit_tests.t
×
719

720
    let logger = Logger.null ()
×
721

722
    let () =
723
      (* Disable log messages from best_tip_diff logger. *)
724
      Logger.Consumer_registry.register ~commit_id:""
×
725
        ~id:Logger.Logger_id.best_tip_diff ~processor:(Logger.Processor.raw ())
×
726
        ~transport:
727
          (Logger.Transport.create
×
728
             ( module struct
729
               type t = unit
730

731
               let transport () _ = ()
×
732
             end )
733
             () )
734
        ()
735

736
    let trust_system =
737
      let s = Trust_system.null () in
738
      don't_wait_for
×
739
        (Pipe_lib.Strict_pipe.Reader.iter
×
740
           (Trust_system.upcall_pipe s)
×
741
           ~f:(const Deferred.unit) ) ;
×
742
      s
×
743

744
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
745

746
    let proof_level = precomputed_values.proof_level
747

748
    let constraint_constants = precomputed_values.constraint_constants
749

750
    let ledger_sync_config =
751
      Syncable_ledger.create_config
×
752
        ~compile_config:Mina_compile_config.For_unit_tests.t
753
        ~max_subtree_depth:None ~default_subtree_depth:None ()
754

755
    module Context = struct
756
      let logger = logger
757

758
      let precomputed_values = precomputed_values
759

760
      let constraint_constants =
761
        Genesis_constants.For_unit_tests.Constraint_constants.t
762

763
      let consensus_constants = precomputed_values.consensus_constants
764

765
      let ledger_sync_config =
766
        Syncable_ledger.create_config
×
767
          ~compile_config:Mina_compile_config.For_unit_tests.t
768
          ~max_subtree_depth:None ~default_subtree_depth:None ()
769

770
      let proof_cache_db = Proof_cache_tag.For_tests.create_db ()
×
771
    end
772

773
    let verifier =
774
      Async.Thread_safe.block_on_async_exn (fun () ->
×
775
          Verifier.For_tests.default ~constraint_constants ~logger ~proof_level
×
776
            () )
777

778
    module Genesis_ledger = (val precomputed_values.genesis_ledger)
779

780
    let downcast_transition ~sender transition =
781
      let transition =
×
782
        transition |> Mina_block.Validated.remember
×
783
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
784
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
785
      in
786
      Envelope.Incoming.wrap ~data:transition
×
787
        ~sender:(Envelope.Sender.Remote sender)
788

789
    let downcast_breadcrumb ~sender breadcrumb =
790
      downcast_transition ~sender
×
791
        (Transition_frontier.Breadcrumb.validated_transition breadcrumb)
×
792

793
    let make_non_running_bootstrap ~genesis_root ~network =
794
      let transition =
×
795
        genesis_root
796
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
797
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
798
      in
799
      { context = (module Context)
×
800
      ; trust_system
801
      ; verifier
802
      ; best_seen_transition = transition
803
      ; current_root = transition
804
      ; network
805
      ; num_of_root_snarked_ledger_retargeted = 0
806
      }
807

808
    let%test_unit "Bootstrap controller caches all transitions it is passed \
809
                   through the transition_reader" =
810
      let branch_size = (max_frontier_length * 2) + 2 in
×
811
      Quickcheck.test ~trials:1
812
        (let open Quickcheck.Generator.Let_syntax in
813
        (* we only need one node for this test, but we need more than one peer so that mina_networking does not throw an error *)
814
        let%bind fake_network =
815
          Fake_network.Generator.(
816
            gen ~precomputed_values ~verifier ~max_frontier_length
×
817
              ~ledger_sync_config [ fresh_peer; fresh_peer ]
818
              ~use_super_catchup:false)
819
        in
820
        let%map make_branch =
821
          Transition_frontier.Breadcrumb.For_tests.gen_seq ~precomputed_values
×
822
            ~verifier
823
            ~accounts_with_secret_keys:(Lazy.force Genesis_ledger.accounts)
×
824
            branch_size
825
        in
826
        let [ me; _ ] = fake_network.peer_networks in
×
827
        let branch =
828
          Async.Thread_safe.block_on_async_exn (fun () ->
829
              make_branch (Transition_frontier.root me.state.frontier) )
×
830
        in
831
        (fake_network, branch))
×
832
        ~f:(fun (fake_network, branch) ->
833
          let [ me; other ] = fake_network.peer_networks in
×
834
          let genesis_root =
835
            Transition_frontier.(
836
              Breadcrumb.validated_transition @@ root me.state.frontier)
×
837
            |> Mina_block.Validated.remember
838
          in
839
          let transition_graph = Transition_cache.create () in
×
840
          let sync_ledger_reader, sync_ledger_writer =
×
841
            Pipe_lib.Strict_pipe.create ~name:"sync_ledger_reader" Synchronous
842
          in
843
          let bootstrap =
×
844
            make_non_running_bootstrap ~genesis_root ~network:me.network
845
          in
846
          let root_sync_ledger =
847
            Sync_ledger.Db.create
848
              (Transition_frontier.root_snarked_ledger me.state.frontier)
×
849
              ~context:(module Context)
850
              ~trust_system
851
          in
852
          Async.Thread_safe.block_on_async_exn (fun () ->
×
853
              let sync_deferred =
×
854
                sync_ledger bootstrap ~root_sync_ledger ~transition_graph
855
                  ~preferred:[] ~sync_ledger_reader
856
              in
857
              let%bind () =
858
                Deferred.List.iter branch ~f:(fun breadcrumb ->
×
859
                    Strict_pipe.Writer.write sync_ledger_writer
×
860
                      ( `Block
861
                          (downcast_breadcrumb ~sender:other.peer breadcrumb)
×
862
                      , `Valid_cb None ) )
863
              in
864
              Strict_pipe.Writer.close sync_ledger_writer ;
×
865
              sync_deferred ) ;
×
866
          let expected_transitions =
×
867
            List.map branch
868
              ~f:
869
                (Fn.compose
×
870
                   (With_hash.map ~f:Mina_block.header)
871
                   (Fn.compose Mina_block.Validation.block_with_hash
×
872
                      (Fn.compose Mina_block.Validated.remember
×
873
                         Transition_frontier.Breadcrumb.validated_transition ) ) )
874
          in
875
          let saved_transitions =
×
876
            Transition_cache.data transition_graph
×
877
            |> List.map ~f:(fun (x, _) ->
878
                   Transition_cache.header_with_hash @@ Envelope.Incoming.data x )
×
879
          in
880
          let module E = struct
×
881
            module T = struct
882
              type t = Mina_block.Header.t State_hash.With_state_hashes.t
×
883
              [@@deriving sexp]
884

885
              let compare = external_transition_compare ~context:(module Context)
886
            end
887

888
            include Comparable.Make (T)
889
          end in
890
          [%test_result: E.Set.t]
×
891
            (E.Set.of_list saved_transitions)
×
892
            ~expect:(E.Set.of_list expected_transitions) )
×
893

894
    let run_bootstrap ~timeout_duration ~my_net ~transition_reader =
895
      let open Fake_network in
×
896
      let time_controller = Block_time.Controller.basic ~logger in
897
      let persistent_root =
898
        Transition_frontier.persistent_root my_net.state.frontier
899
      in
900
      let persistent_frontier =
×
901
        Transition_frontier.persistent_frontier my_net.state.frontier
902
      in
903
      let initial_root_transition =
×
904
        Transition_frontier.(
905
          Breadcrumb.validated_transition (root my_net.state.frontier))
×
906
      in
907
      let%bind () =
908
        Transition_frontier.close ~loc:__LOC__ my_net.state.frontier
×
909
      in
910
      [%log info] "bootstrap begin" ;
×
911
      Block_time.Timeout.await_exn time_controller ~timeout_duration
×
912
        (run
913
           ~context:(module Context)
914
           ~trust_system ~verifier ~network:my_net.network ~preferred_peers:[]
915
           ~consensus_local_state:my_net.state.consensus_local_state
916
           ~transition_reader ~persistent_root ~persistent_frontier
917
           ~catchup_mode:`Normal ~initial_root_transition )
918

919
    let assert_transitions_increasingly_sorted ~root
920
        (incoming_transitions : Transition_cache.element list) =
921
      let root =
×
922
        Transition_frontier.Breadcrumb.block root |> Mina_block.header
×
923
      in
924
      ignore
×
925
        ( List.fold_result ~init:root incoming_transitions
×
926
            ~f:(fun max_acc incoming_transition ->
927
              let With_hash.{ data = header; _ } =
×
928
                Transition_cache.header_with_hash
929
                  (Envelope.Incoming.data @@ fst incoming_transition)
×
930
              in
931
              let header_len h =
×
932
                Mina_block.Header.protocol_state h
×
933
                |> Protocol_state.consensus_state
×
934
                |> Consensus.Data.Consensus_state.blockchain_length
935
              in
936
              let open Result.Let_syntax in
937
              let%map () =
938
                Result.ok_if_true
×
939
                  Mina_numbers.Length.(header_len max_acc <= header_len header)
×
940
                  ~error:
941
                    (Error.of_string
×
942
                       "The blocks are not sorted in increasing order" )
943
              in
944
              header )
×
945
          |> Or_error.ok_exn
×
946
          : Mina_block.Header.t )
947

948
    let%test_unit "sync with one node after receiving a transition" =
949
      Quickcheck.test ~trials:1
×
950
        Fake_network.Generator.(
951
          gen ~precomputed_values ~verifier ~max_frontier_length
×
952
            ~use_super_catchup:false ~ledger_sync_config
953
            [ fresh_peer
954
            ; peer_with_branch
955
                ~frontier_branch_size:((max_frontier_length * 2) + 2)
956
            ])
957
        ~f:(fun fake_network ->
958
          let [ my_net; peer_net ] = fake_network.peer_networks in
×
959
          let transition_reader, transition_writer =
960
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
961
              (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
962
          in
963
          let block =
×
964
            Envelope.Incoming.wrap
965
              ~data:
966
                ( Transition_frontier.best_tip peer_net.state.frontier
×
967
                |> Transition_frontier.Breadcrumb.validated_transition
×
968
                |> Mina_block.Validated.remember
×
969
                |> Mina_block.Validation.reset_frontier_dependencies_validation
×
970
                |> Mina_block.Validation.reset_staged_ledger_diff_validation )
×
971
              ~sender:(Envelope.Sender.Remote peer_net.peer)
972
          in
973
          Pipe_lib.Strict_pipe.Writer.write transition_writer
974
            (`Block block, `Valid_cb None) ;
975
          let new_frontier, sorted_external_transitions =
×
976
            Async.Thread_safe.block_on_async_exn (fun () ->
977
                run_bootstrap
×
978
                  ~timeout_duration:(Block_time.Span.of_ms 30_000L)
×
979
                  ~my_net ~transition_reader )
980
          in
981
          assert_transitions_increasingly_sorted
×
982
            ~root:(Transition_frontier.root new_frontier)
×
983
            sorted_external_transitions ;
984
          [%test_result: Ledger_hash.t]
×
985
            ( Ledger.Db.merkle_root
×
986
            @@ Transition_frontier.root_snarked_ledger new_frontier )
×
987
            ~expect:
988
              ( Ledger.Db.merkle_root
×
989
              @@ Transition_frontier.root_snarked_ledger peer_net.state.frontier
×
990
              ) )
991

992
    let%test_unit "reconstruct staged_ledgers using \
993
                   of_scan_state_and_snarked_ledger" =
994
      Quickcheck.test ~trials:1
×
995
        (Transition_frontier.For_tests.gen ~precomputed_values ~verifier
×
996
           ~max_length:max_frontier_length ~size:max_frontier_length () )
997
        ~f:(fun frontier ->
998
          Thread_safe.block_on_async_exn
×
999
          @@ fun () ->
1000
          Deferred.List.iter (Transition_frontier.all_breadcrumbs frontier)
×
1001
            ~f:(fun breadcrumb ->
1002
              let staged_ledger =
×
1003
                Transition_frontier.Breadcrumb.staged_ledger breadcrumb
1004
              in
1005
              let expected_merkle_root =
×
1006
                Staged_ledger.ledger staged_ledger |> Ledger.merkle_root
×
1007
              in
1008
              let snarked_ledger =
×
1009
                Transition_frontier.root_snarked_ledger frontier
×
1010
                |> Ledger.of_database
1011
              in
1012
              let snarked_local_state =
×
1013
                Transition_frontier.root frontier
×
1014
                |> Transition_frontier.Breadcrumb.protocol_state
×
1015
                |> Protocol_state.blockchain_state
×
1016
                |> Blockchain_state.snarked_local_state
1017
              in
1018
              let scan_state = Staged_ledger.scan_state staged_ledger in
×
1019
              let get_state hash =
×
1020
                match Transition_frontier.find_protocol_state frontier hash with
×
1021
                | Some protocol_state ->
×
1022
                    Ok protocol_state
1023
                | None ->
×
1024
                    Or_error.errorf
1025
                      !"Protocol state (for scan state transactions) for \
×
1026
                        %{sexp:State_hash.t} not found"
1027
                      hash
1028
              in
1029
              let pending_coinbases =
1030
                Staged_ledger.pending_coinbase_collection staged_ledger
1031
              in
1032
              let%map actual_staged_ledger =
1033
                Staged_ledger.of_scan_state_pending_coinbases_and_snarked_ledger
1034
                  ~scan_state ~logger ~verifier ~constraint_constants
1035
                  ~snarked_ledger ~snarked_local_state ~expected_merkle_root
1036
                  ~pending_coinbases ~get_state
1037
                |> Deferred.Or_error.ok_exn
×
1038
              in
1039
              let height =
×
1040
                Transition_frontier.Breadcrumb.consensus_state breadcrumb
×
1041
                |> Consensus.Data.Consensus_state.blockchain_length
×
1042
                |> Mina_numbers.Length.to_int
1043
              in
1044
              [%test_eq: Staged_ledger_hash.t]
×
1045
                ~message:
1046
                  (sprintf "mismatch of staged ledger hash height %d" height)
×
1047
                (Transition_frontier.Breadcrumb.staged_ledger_hash breadcrumb)
×
1048
                (Staged_ledger.hash actual_staged_ledger) ) )
×
1049

1050
    (*
1051
    let%test_unit "if we see a new transition that is better than the \
1052
                   transition that we are syncing from, than we should \
1053
                   retarget our root" =
1054
      Quickcheck.test ~trials:1
1055
        Fake_network.Generator.(
1056
          gen ~max_frontier_length
1057
            [ fresh_peer
1058
            ; peer_with_branch ~frontier_branch_size:max_frontier_length
1059
            ; peer_with_branch
1060
                ~frontier_branch_size:((max_frontier_length * 2) + 2) ])
1061
        ~f:(fun fake_network ->
1062
          let [me; weaker_chain; stronger_chain] =
1063
            fake_network.peer_networks
1064
          in
1065
          let transition_reader, transition_writer =
1066
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
1067
              (Buffered (`Capacity 10, `Overflow Drop_head))
1068
          in
1069
          Envelope.Incoming.wrap
1070
            ~data:
1071
              ( Transition_frontier.best_tip weaker_chain.state.frontier
1072
              |> Transition_frontier.Breadcrumb.validated_transition
1073
              |> Mina_block.Validated.to_initial_validated )
1074
            ~sender:
1075
              (Envelope.Sender.Remote
1076
                 (weaker_chain.peer.host, weaker_chain.peer.peer_id))
1077
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1078
          Envelope.Incoming.wrap
1079
            ~data:
1080
              ( Transition_frontier.best_tip stronger_chain.state.frontier
1081
              |> Transition_frontier.Breadcrumb.validated_transition
1082
              |> Mina_block.Validated.to_initial_validated )
1083
            ~sender:
1084
              (Envelope.Sender.Remote
1085
                 (stronger_chain.peer.host, stronger_chain.peer.peer_id))
1086
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1087
          let new_frontier, sorted_external_transitions =
1088
            Async.Thread_safe.block_on_async_exn (fun () ->
1089
                run_bootstrap
1090
                  ~timeout_duration:(Block_time.Span.of_ms 60_000L)
1091
                  ~my_net:me ~transition_reader )
1092
          in
1093
          assert_transitions_increasingly_sorted
1094
            ~root:(Transition_frontier.root new_frontier)
1095
            sorted_external_transitions ;
1096
          [%test_result: Ledger_hash.t]
1097
            ( Ledger.Db.merkle_root
1098
            @@ Transition_frontier.root_snarked_ledger new_frontier )
1099
            ~expect:
1100
              ( Ledger.Db.merkle_root
1101
              @@ Transition_frontier.root_snarked_ledger
1102
                   stronger_chain.state.frontier ) )
1103
*)
1104
  end )
28✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc