• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2908

12 Nov 2024 12:30PM UTC coverage: 36.715% (-25.0%) from 61.682%
2908

push

buildkite

web-flow
Merge pull request #16333 from MinaProtocol/dkijania/port_new_deb_s3_dev

[DEV] Use new version of deb-s3 for validating job publishing

24544 of 66850 relevant lines covered (36.72%)

20720.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.83
/src/lib/bootstrap_controller/bootstrap_controller.ml
1
(* Only show stdout for failed inline tests. *)
2✔
2
open Inline_test_quiet_logs
3
open Core
4
open Async
5
open Mina_base
6
module Ledger = Mina_ledger.Ledger
7
module Sync_ledger = Mina_ledger.Sync_ledger
8
open Mina_state
9
open Pipe_lib.Strict_pipe
10
open Network_peer
11
module Transition_cache = Transition_cache
12

13
module type CONTEXT = sig
14
  val logger : Logger.t
15

16
  val precomputed_values : Precomputed_values.t
17

18
  val constraint_constants : Genesis_constants.Constraint_constants.t
19

20
  val consensus_constants : Consensus.Constants.t
21

22
  val compile_config : Mina_compile_config.t
23
end
24

25
type Structured_log_events.t += Bootstrap_complete
26
  [@@deriving register_event { msg = "Bootstrap state: complete." }]
3✔
27

28
type t =
29
  { context : (module CONTEXT)
30
  ; trust_system : Trust_system.t
31
  ; verifier : Verifier.t
32
  ; mutable best_seen_transition : Mina_block.initial_valid_block
33
  ; mutable current_root : Mina_block.initial_valid_block
34
  ; network : Mina_networking.t
35
  ; mutable num_of_root_snarked_ledger_retargeted : int
36
  }
37

38
type time = Time.Span.t
39

40
let time_to_yojson span =
41
  `String (Printf.sprintf "%f seconds" (Time.Span.to_sec span))
×
42

43
type opt_time = time option
44

45
let opt_time_to_yojson = function
46
  | Some time ->
×
47
      time_to_yojson time
48
  | None ->
×
49
      `Null
50

51
(** An auxiliary data structure for collecting various metrics for boostrap controller. *)
52
type bootstrap_cycle_stats =
×
53
  { cycle_result : string
×
54
  ; sync_ledger_time : time
×
55
  ; staged_ledger_data_download_time : time
×
56
  ; staged_ledger_construction_time : opt_time
×
57
  ; local_state_sync_required : bool
×
58
  ; local_state_sync_time : opt_time
×
59
  }
60
[@@deriving to_yojson]
61

62
let time_deferred deferred =
63
  let start_time = Time.now () in
×
64
  let%map result = deferred in
65
  let end_time = Time.now () in
×
66
  (Time.diff end_time start_time, result)
×
67

68
let worth_getting_root ({ context = (module Context); _ } as t) candidate =
69
  let module Context = struct
×
70
    include Context
71

72
    let logger =
73
      Logger.extend logger
×
74
        [ ( "selection_context"
75
          , `String "Bootstrap_controller.worth_getting_root" )
76
        ]
77
  end in
78
  Consensus.Hooks.equal_select_status `Take
79
  @@ Consensus.Hooks.select
80
       ~context:(module Context)
81
       ~existing:
82
         ( t.best_seen_transition |> Mina_block.Validation.block_with_hash
×
83
         |> With_hash.map ~f:Mina_block.consensus_state )
×
84
       ~candidate
85

86
let received_bad_proof ({ context = (module Context); _ } as t) host e =
87
  let open Context in
×
88
  Trust_system.(
89
    record t.trust_system logger host
90
      Actions.
91
        ( Violated_protocol
92
        , Some
93
            ( "Bad ancestor proof: $error"
94
            , [ ("error", Error_json.error_to_yojson e) ] ) ))
×
95

96
let done_syncing_root root_sync_ledger =
97
  Option.is_some (Sync_ledger.Db.peek_valid_tree root_sync_ledger)
×
98

99
let should_sync ~root_sync_ledger t candidate_state =
100
  (not @@ done_syncing_root root_sync_ledger)
×
101
  && worth_getting_root t candidate_state
×
102

103
(** Update [Synced_ledger]'s target and [best_seen_transition] and [current_root] accordingly. *)
104
let start_sync_job_with_peer ~sender ~root_sync_ledger
105
    ({ context = (module Context); _ } as t) peer_best_tip peer_root =
106
  let open Context in
×
107
  let%bind () =
108
    Trust_system.(
109
      record t.trust_system logger sender
×
110
        Actions.
111
          ( Fulfilled_request
112
          , Some ("Received verified peer root and best tip", []) ))
113
  in
114
  t.best_seen_transition <- peer_best_tip ;
×
115
  t.current_root <- peer_root ;
116
  let blockchain_state =
117
    t.current_root |> Mina_block.Validation.block |> Mina_block.header
×
118
    |> Mina_block.Header.protocol_state |> Protocol_state.blockchain_state
×
119
  in
120
  let expected_staged_ledger_hash =
×
121
    blockchain_state |> Blockchain_state.staged_ledger_hash
122
  in
123
  let snarked_ledger_hash =
×
124
    blockchain_state |> Blockchain_state.snarked_ledger_hash
125
  in
126
  return
×
127
  @@
128
  match
129
    Sync_ledger.Db.new_goal root_sync_ledger
130
      (Frozen_ledger_hash.to_ledger_hash snarked_ledger_hash)
×
131
      ~data:
132
        ( State_hash.With_state_hashes.state_hash
×
133
          @@ Mina_block.Validation.block_with_hash t.current_root
×
134
        , sender
135
        , expected_staged_ledger_hash )
136
      ~equal:(fun (hash1, _, _) (hash2, _, _) -> State_hash.equal hash1 hash2)
×
137
  with
138
  | `New ->
×
139
      t.num_of_root_snarked_ledger_retargeted <-
140
        t.num_of_root_snarked_ledger_retargeted + 1 ;
141
      `Syncing_new_snarked_ledger
142
  | `Update_data ->
×
143
      `Updating_root_transition
144
  | `Repeat ->
×
145
      `Ignored
146

147
let to_consensus_state h =
148
  Mina_block.Header.protocol_state h |> Protocol_state.consensus_state
×
149

150
(** For each transition, this function would compare it with the existing one.
151
    If the incoming transition is better, then download the merkle list from
152
    that transition to its root and verify it. If we get a better root than
153
    the existing one, then reset the Sync_ledger's target by calling
154
    [start_sync_job_with_peer] function. *)
155
let on_transition ({ context = (module Context); _ } as t) ~sender
156
    ~root_sync_ledger ~genesis_constants candidate_header =
157
  let open Context in
×
158
  let candidate_consensus_state =
159
    With_hash.map ~f:to_consensus_state candidate_header
160
  in
161
  if not @@ should_sync ~root_sync_ledger t candidate_consensus_state then
×
162
    Deferred.return `Ignored
×
163
  else
164
    match%bind
165
      Mina_networking.get_ancestry t.network sender.Peer.peer_id
×
166
        (With_hash.map_hash candidate_consensus_state
×
167
           ~f:State_hash.State_hashes.state_hash )
168
    with
169
    | Error e ->
×
170
        [%log error]
×
171
          ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
172
          !"Could not get the proof of the root transition from the network: \
173
            $error" ;
174
        Deferred.return `Ignored
×
175
    | Ok peer_root_with_proof -> (
×
176
        match%bind
177
          Sync_handler.Root.verify
×
178
            ~context:(module Context)
179
            ~verifier:t.verifier ~genesis_constants candidate_consensus_state
180
            peer_root_with_proof.data
181
        with
182
        | Ok (`Root root, `Best_tip best_tip) ->
×
183
            if done_syncing_root root_sync_ledger then return `Ignored
×
184
            else
185
              start_sync_job_with_peer ~sender ~root_sync_ledger t best_tip root
×
186
        | Error e ->
×
187
            return (received_bad_proof t sender e |> Fn.const `Ignored) )
×
188

189
(** A helper function that wraps the calls to Sync_ledger and iterate through
190
    incoming transitions, add those to the transition_cache and calls
191
    [on_transition] function. *)
192
let sync_ledger ({ context = (module Context); _ } as t) ~preferred
193
    ~root_sync_ledger ~transition_graph ~sync_ledger_reader ~genesis_constants =
194
  let open Context in
×
195
  let query_reader = Sync_ledger.Db.query_reader root_sync_ledger in
196
  let response_writer = Sync_ledger.Db.answer_writer root_sync_ledger in
×
197
  Mina_networking.glue_sync_ledger ~preferred t.network query_reader
×
198
    response_writer ;
199
  Reader.iter sync_ledger_reader ~f:(fun (b_or_h, `Valid_cb vc) ->
×
200
      let header_with_hash, sender, transition_cache_element =
×
201
        match b_or_h with
202
        | `Block b_env ->
×
203
            ( Envelope.Incoming.data b_env
×
204
              |> Mina_block.Validation.block_with_hash
×
205
              |> With_hash.map ~f:Mina_block.header
×
206
            , Envelope.Incoming.remote_sender_exn b_env
×
207
            , Envelope.Incoming.map ~f:(fun x -> `Block x) b_env )
×
208
        | `Header h_env ->
×
209
            ( Envelope.Incoming.data h_env
×
210
              |> Mina_block.Validation.header_with_hash
×
211
            , Envelope.Incoming.remote_sender_exn h_env
×
212
            , Envelope.Incoming.map ~f:(fun x -> `Header x) h_env )
×
213
      in
214
      let previous_state_hash =
215
        With_hash.data header_with_hash
×
216
        |> Mina_block.Header.protocol_state
×
217
        |> Protocol_state.previous_state_hash
218
      in
219
      Transition_cache.add transition_graph ~parent:previous_state_hash
×
220
        (transition_cache_element, vc) ;
221
      (* TODO: Efficiently limiting the number of green threads in #1337 *)
222
      if
×
223
        worth_getting_root t
224
          (With_hash.map ~f:to_consensus_state header_with_hash)
×
225
      then (
×
226
        [%log trace] "Added the transition from sync_ledger_reader into cache"
×
227
          ~metadata:
228
            [ ( "state_hash"
229
              , State_hash.to_yojson
×
230
                  (State_hash.With_state_hashes.state_hash header_with_hash) )
×
231
            ; ( "header"
232
              , Mina_block.Header.to_yojson (With_hash.data header_with_hash) )
×
233
            ] ;
234

235
        Deferred.ignore_m
×
236
        @@ on_transition t ~sender ~root_sync_ledger ~genesis_constants
×
237
             header_with_hash )
238
      else Deferred.unit )
×
239

240
let external_transition_compare ~context:(module Context : CONTEXT) =
241
  let get_consensus_state =
×
242
    Fn.compose Protocol_state.consensus_state Mina_block.Header.protocol_state
243
  in
244
  Comparable.lift
×
245
    (fun existing candidate ->
246
      (* To prevent the logger to spam a lot of messsages, the logger input is set to null *)
247
      if
×
248
        State_hash.equal
249
          (State_hash.With_state_hashes.state_hash existing)
×
250
          (State_hash.With_state_hashes.state_hash candidate)
×
251
      then 0
×
252
      else if
×
253
        Consensus.Hooks.equal_select_status `Keep
254
        @@ Consensus.Hooks.select ~context:(module Context) ~existing ~candidate
255
      then -1
×
256
      else 1 )
×
257
    ~f:(With_hash.map ~f:get_consensus_state)
258

259
(** The entry point function for bootstrap controller. When bootstrap finished
260
    it would return a transition frontier with the root breadcrumb and a list
261
    of transitions collected during bootstrap.
262

263
    Bootstrap controller would do the following steps to contrust the
264
    transition frontier:
265
    1. Download the root snarked_ledger.
266
    2. Download the scan state and pending coinbases.
267
    3. Construct the staged ledger from the snarked ledger, scan state and
268
       pending coinbases.
269
    4. Synchronize the consensus local state if necessary.
270
    5. Close the old frontier and reload a new one from disk.
271
 *)
272
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
273
    ~consensus_local_state ~transition_reader ~preferred_peers ~persistent_root
274
    ~persistent_frontier ~initial_root_transition ~catchup_mode =
275
  let open Context in
×
276
  O1trace.thread "bootstrap" (fun () ->
277
      let genesis_constants =
×
278
        Precomputed_values.genesis_constants precomputed_values
279
      in
280
      let constraint_constants = precomputed_values.constraint_constants in
×
281
      let rec loop previous_cycles =
282
        let sync_ledger_pipe = "sync ledger pipe" in
×
283
        let sync_ledger_reader, sync_ledger_writer =
284
          create ~name:sync_ledger_pipe
285
            (Buffered
286
               ( `Capacity 50
287
               , `Overflow
288
                   (Drop_head
289
                      (fun (b_or_h, `Valid_cb valid_cb) ->
290
                        let hash =
×
291
                          match b_or_h with
292
                          | `Block b_env ->
×
293
                              Envelope.Incoming.data b_env
×
294
                              |> Mina_block.Validation.block_with_hash
×
295
                              |> With_hash.hash
×
296
                          | `Header h_env ->
×
297
                              Envelope.Incoming.data h_env
×
298
                              |> Mina_block.Validation.header_with_hash
×
299
                              |> With_hash.hash
×
300
                        in
301
                        Mina_metrics.(
302
                          Counter.inc_one
×
303
                            Pipe.Drop_on_overflow.bootstrap_sync_ledger) ;
304
                        Mina_block.handle_dropped_transition ?valid_cb hash
305
                          ~pipe_name:sync_ledger_pipe ~logger ) ) ) )
306
        in
307
        don't_wait_for
×
308
          (transfer_while_writer_alive transition_reader sync_ledger_writer
×
309
             ~f:Fn.id ) ;
310
        let initial_root_transition =
×
311
          initial_root_transition |> Mina_block.Validated.remember
×
312
          |> Mina_block.Validation.reset_frontier_dependencies_validation
×
313
          |> Mina_block.Validation.reset_staged_ledger_diff_validation
314
        in
315
        let t =
×
316
          { network
317
          ; context = (module Context)
318
          ; trust_system
319
          ; verifier
320
          ; best_seen_transition = initial_root_transition
321
          ; current_root = initial_root_transition
322
          ; num_of_root_snarked_ledger_retargeted = 0
323
          }
324
        in
325
        let transition_graph = Transition_cache.create () in
326
        let temp_persistent_root_instance =
×
327
          Transition_frontier.Persistent_root.create_instance_exn
328
            persistent_root
329
        in
330
        let temp_snarked_ledger =
×
331
          Transition_frontier.Persistent_root.Instance.snarked_ledger
332
            temp_persistent_root_instance
333
        in
334
        (* step 1. download snarked_ledger *)
335
        let%bind sync_ledger_time, (hash, sender, expected_staged_ledger_hash) =
336
          time_deferred
×
337
            (let root_sync_ledger =
338
               Sync_ledger.Db.create temp_snarked_ledger ~logger ~trust_system
339
             in
340
             don't_wait_for
×
341
               (sync_ledger t ~preferred:preferred_peers ~root_sync_ledger
×
342
                  ~transition_graph ~sync_ledger_reader ~genesis_constants ) ;
343
             (* We ignore the resulting ledger returned here since it will always
344
                * be the same as the ledger we started with because we are syncing
345
                * a db ledger. *)
346
             let%map _, data = Sync_ledger.Db.valid_tree root_sync_ledger in
×
347
             Sync_ledger.Db.destroy root_sync_ledger ;
×
348
             data )
×
349
        in
350
        Mina_metrics.(
×
351
          Counter.inc Bootstrap.root_snarked_ledger_sync_ms
×
352
            Time.Span.(to_ms sync_ledger_time)) ;
×
353
        Mina_metrics.(
354
          Gauge.set Bootstrap.num_of_root_snarked_ledger_retargeted
×
355
            (Float.of_int t.num_of_root_snarked_ledger_retargeted)) ;
×
356
        (* step 2. Download scan state and pending coinbases. *)
357
        let%bind ( staged_ledger_data_download_time
358
                 , staged_ledger_construction_time
359
                 , staged_ledger_aux_result ) =
360
          let%bind ( staged_ledger_data_download_time
361
                   , staged_ledger_data_download_result ) =
362
            time_deferred
×
363
              (Mina_networking
364
               .get_staged_ledger_aux_and_pending_coinbases_at_hash t.network
×
365
                 sender.peer_id hash )
366
          in
367
          match staged_ledger_data_download_result with
×
368
          | Error err ->
×
369
              Deferred.return (staged_ledger_data_download_time, None, Error err)
370
          | Ok
×
371
              ( scan_state
372
              , expected_merkle_root
373
              , pending_coinbases
374
              , protocol_states ) -> (
375
              let%map staged_ledger_construction_result =
376
                O1trace.thread "construct_root_staged_ledger" (fun () ->
×
377
                    let open Deferred.Or_error.Let_syntax in
×
378
                    let received_staged_ledger_hash =
379
                      Staged_ledger_hash.of_aux_ledger_and_coinbase_hash
380
                        (Staged_ledger.Scan_state.hash scan_state)
×
381
                        expected_merkle_root pending_coinbases
382
                    in
383
                    [%log debug]
×
384
                      ~metadata:
385
                        [ ( "expected_staged_ledger_hash"
386
                          , Staged_ledger_hash.to_yojson
×
387
                              expected_staged_ledger_hash )
388
                        ; ( "received_staged_ledger_hash"
389
                          , Staged_ledger_hash.to_yojson
×
390
                              received_staged_ledger_hash )
391
                        ]
392
                      "Comparing $expected_staged_ledger_hash to \
393
                       $received_staged_ledger_hash" ;
394
                    let%bind new_root =
395
                      t.current_root
396
                      |> Mina_block.Validation
397
                         .skip_frontier_dependencies_validation
×
398
                           `This_block_belongs_to_a_detached_subtree
399
                      |> Mina_block.Validation.validate_staged_ledger_hash
×
400
                           (`Staged_ledger_already_materialized
401
                             received_staged_ledger_hash )
402
                      |> Result.map_error ~f:(fun _ ->
×
403
                             Error.of_string
×
404
                               "received faulty scan state from peer" )
405
                      |> Deferred.return
×
406
                    in
407
                    let protocol_states =
×
408
                      List.map protocol_states
409
                        ~f:(With_hash.of_data ~hash_data:Protocol_state.hashes)
410
                    in
411
                    let%bind protocol_states =
412
                      Staged_ledger.Scan_state.check_required_protocol_states
×
413
                        scan_state ~protocol_states
414
                      |> Deferred.return
×
415
                    in
416
                    let protocol_states_map =
×
417
                      protocol_states
418
                      |> List.map ~f:(fun ps ->
×
419
                             (State_hash.With_state_hashes.state_hash ps, ps) )
×
420
                      |> State_hash.Map.of_alist_exn
421
                    in
422
                    let get_state hash =
×
423
                      match Map.find protocol_states_map hash with
×
424
                      | None ->
×
425
                          let new_state_hash =
426
                            State_hash.With_state_hashes.state_hash
427
                              (fst new_root)
×
428
                          in
429
                          [%log error]
×
430
                            ~metadata:
431
                              [ ("new_root", State_hash.to_yojson new_state_hash)
×
432
                              ; ("state_hash", State_hash.to_yojson hash)
×
433
                              ]
434
                            "Protocol state (for scan state transactions) for \
435
                             $state_hash not found when bootstrapping to the \
436
                             new root $new_root" ;
437
                          Or_error.errorf
×
438
                            !"Protocol state (for scan state transactions) for \
×
439
                              %{sexp:State_hash.t} not found when \
440
                              bootstrapping to the new root \
441
                              %{sexp:State_hash.t}"
442
                            hash new_state_hash
443
                      | Some protocol_state ->
×
444
                          Ok (With_hash.data protocol_state)
×
445
                    in
446
                    (* step 3. Construct staged ledger from snarked ledger, scan state
447
                       and pending coinbases. *)
448
                    (* Construct the staged ledger before constructing the transition
449
                     * frontier in order to verify the scan state we received.
450
                     * TODO: reorganize the code to avoid doing this twice (#3480) *)
451
                    let open Deferred.Let_syntax in
452
                    let%map staged_ledger_construction_time, construction_result
453
                        =
454
                      time_deferred
×
455
                        (let open Deferred.Let_syntax in
456
                        let temp_mask =
457
                          Ledger.of_database temp_snarked_ledger
458
                        in
459
                        let%map result =
460
                          Staged_ledger
461
                          .of_scan_state_pending_coinbases_and_snarked_ledger
462
                            ~logger
463
                            ~snarked_local_state:
464
                              Mina_block.(
465
                                t.current_root |> Validation.block |> header
×
466
                                |> Header.protocol_state
×
467
                                |> Protocol_state.blockchain_state
×
468
                                |> Blockchain_state.snarked_local_state)
×
469
                            ~verifier ~constraint_constants ~scan_state
470
                            ~snarked_ledger:temp_mask ~expected_merkle_root
471
                            ~pending_coinbases ~get_state
472
                        in
473
                        ignore
×
474
                          ( Ledger.Maskable.unregister_mask_exn ~loc:__LOC__
×
475
                              temp_mask
476
                            : Ledger.unattached_mask ) ;
477
                        Result.map result
478
                          ~f:
479
                            (const
×
480
                               ( scan_state
481
                               , pending_coinbases
482
                               , new_root
483
                               , protocol_states ) ))
484
                    in
485
                    Ok (staged_ledger_construction_time, construction_result) )
×
486
              in
487
              match staged_ledger_construction_result with
×
488
              | Error err ->
×
489
                  (staged_ledger_data_download_time, None, Error err)
490
              | Ok (staged_ledger_construction_time, result) ->
×
491
                  ( staged_ledger_data_download_time
492
                  , Some staged_ledger_construction_time
493
                  , result ) )
494
        in
495
        Transition_frontier.Persistent_root.Instance.close
×
496
          temp_persistent_root_instance ;
497
        match staged_ledger_aux_result with
×
498
        | Error e ->
×
499
            let%bind () =
500
              Trust_system.(
501
                record t.trust_system logger sender
×
502
                  Actions.
503
                    ( Outgoing_connection_error
504
                    , Some
505
                        ( "Can't find scan state from the peer or received \
506
                           faulty scan state from the peer."
507
                        , [] ) ))
508
            in
509
            [%log error]
×
510
              ~metadata:
511
                [ ("error", Error_json.error_to_yojson e)
×
512
                ; ("state_hash", State_hash.to_yojson hash)
×
513
                ; ( "expected_staged_ledger_hash"
514
                  , Staged_ledger_hash.to_yojson expected_staged_ledger_hash )
×
515
                ]
516
              "Failed to find scan state for the transition with hash \
517
               $state_hash from the peer or received faulty scan state: \
518
               $error. Retry bootstrap" ;
519
            Writer.close sync_ledger_writer ;
×
520
            let this_cycle =
×
521
              { cycle_result = "failed to download and construct scan state"
522
              ; sync_ledger_time
523
              ; staged_ledger_data_download_time
524
              ; staged_ledger_construction_time
525
              ; local_state_sync_required = false
526
              ; local_state_sync_time = None
527
              }
528
            in
529
            loop (this_cycle :: previous_cycles)
530
        | Ok (scan_state, pending_coinbase, new_root, protocol_states) -> (
×
531
            let%bind () =
532
              Trust_system.(
533
                record t.trust_system logger sender
×
534
                  Actions.
535
                    ( Fulfilled_request
536
                    , Some ("Received valid scan state from peer", []) ))
537
            in
538
            let best_seen_block_with_hash, _ = t.best_seen_transition in
×
539
            let consensus_state =
540
              With_hash.data best_seen_block_with_hash
×
541
              |> Mina_block.header |> Mina_block.Header.protocol_state
×
542
              |> Protocol_state.consensus_state
543
            in
544
            (* step 4. Synchronize consensus local state if necessary *)
545
            let%bind ( local_state_sync_time
546
                     , (local_state_sync_required, local_state_sync_result) ) =
547
              time_deferred
×
548
                ( match
549
                    Consensus.Hooks.required_local_state_sync
550
                      ~constants:precomputed_values.consensus_constants
551
                      ~consensus_state ~local_state:consensus_local_state
552
                  with
553
                | None ->
×
554
                    [%log debug]
×
555
                      ~metadata:
556
                        [ ( "local_state"
557
                          , Consensus.Data.Local_state.to_yojson
×
558
                              consensus_local_state )
559
                        ; ( "consensus_state"
560
                          , Consensus.Data.Consensus_state.Value.to_yojson
×
561
                              consensus_state )
562
                        ]
563
                      "Not synchronizing consensus local state" ;
564
                    Deferred.return (false, Or_error.return ())
×
565
                | Some sync_jobs ->
×
566
                    [%log info] "Synchronizing consensus local state" ;
×
567
                    let%map result =
568
                      Consensus.Hooks.sync_local_state
×
569
                        ~context:(module Context)
570
                        ~local_state:consensus_local_state ~trust_system
571
                        ~glue_sync_ledger:
572
                          (Mina_networking.glue_sync_ledger t.network)
×
573
                        sync_jobs
574
                    in
575
                    (true, result) )
×
576
            in
577
            match local_state_sync_result with
×
578
            | Error e ->
×
579
                [%log error]
×
580
                  ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
581
                  "Local state sync failed: $error. Retry bootstrap" ;
582
                Writer.close sync_ledger_writer ;
×
583
                let this_cycle =
×
584
                  { cycle_result = "failed to synchronize local state"
585
                  ; sync_ledger_time
586
                  ; staged_ledger_data_download_time
587
                  ; staged_ledger_construction_time
588
                  ; local_state_sync_required
589
                  ; local_state_sync_time = Some local_state_sync_time
590
                  }
591
                in
592
                loop (this_cycle :: previous_cycles)
593
            | Ok () ->
×
594
                (* step 5. Close the old frontier and reload a new one from disk. *)
595
                let new_root_data : Transition_frontier.Root_data.Limited.t =
596
                  Transition_frontier.Root_data.Limited.create
597
                    ~transition:(Mina_block.Validated.lift new_root)
×
598
                    ~scan_state ~pending_coinbase ~protocol_states
599
                in
600
                let%bind () =
601
                  Transition_frontier.Persistent_frontier.reset_database_exn
×
602
                    persistent_frontier ~root_data:new_root_data
603
                    ~genesis_state_hash:
604
                      (State_hash.With_state_hashes.state_hash
×
605
                         precomputed_values.protocol_state_with_hashes )
606
                in
607
                (* TODO: lazy load db in persistent root to avoid unecessary opens like this *)
608
                Transition_frontier.Persistent_root.(
×
609
                  with_instance_exn persistent_root ~f:(fun instance ->
×
610
                      Instance.set_root_state_hash instance
×
611
                      @@ Mina_block.Validated.state_hash
×
612
                      @@ Mina_block.Validated.lift new_root )) ;
×
613
                let%map new_frontier =
614
                  let fail msg =
615
                    failwith
×
616
                      ( "failed to initialize transition frontier after \
617
                         bootstrapping: " ^ msg )
618
                  in
619
                  Transition_frontier.load
×
620
                    ~context:(module Context)
621
                    ~retry_with_fresh_db:false ~verifier ~consensus_local_state
622
                    ~persistent_root ~persistent_frontier ~catchup_mode ()
623
                  >>| function
×
624
                  | Ok frontier ->
×
625
                      frontier
626
                  | Error (`Failure msg) ->
×
627
                      fail msg
628
                  | Error `Bootstrap_required ->
×
629
                      fail
630
                        "bootstrap still required (indicates logical error in \
631
                         code)"
632
                  | Error `Persistent_frontier_malformed ->
×
633
                      fail "persistent frontier was malformed"
634
                  | Error `Snarked_ledger_mismatch ->
×
635
                      fail
636
                        "this should not happen, because we just reset the \
637
                         snarked_ledger"
638
                in
639
                [%str_log info] Bootstrap_complete ;
×
640
                let collected_transitions =
×
641
                  Transition_cache.data transition_graph
642
                in
643
                let logger =
×
644
                  Logger.extend logger
645
                    [ ( "context"
646
                      , `String "Filter collected transitions in bootstrap" )
647
                    ]
648
                in
649
                let root_consensus_state =
×
650
                  Transition_frontier.(
651
                    Breadcrumb.consensus_state_with_hashes (root new_frontier))
×
652
                in
653
                let filtered_collected_transitions =
654
                  List.filter collected_transitions
655
                    ~f:(fun (incoming_transition, _) ->
656
                      let transition =
×
657
                        Envelope.Incoming.data incoming_transition
×
658
                        |> Transition_cache.header_with_hash
659
                      in
660
                      Consensus.Hooks.equal_select_status `Take
×
661
                      @@ Consensus.Hooks.select
662
                           ~context:(module Context)
663
                           ~existing:root_consensus_state
664
                           ~candidate:
665
                             (With_hash.map
×
666
                                ~f:
667
                                  (Fn.compose Protocol_state.consensus_state
×
668
                                     Mina_block.Header.protocol_state )
669
                                transition ) )
670
                in
671
                [%log debug] "Sorting filtered transitions by consensus state"
×
672
                  ~metadata:[] ;
673
                let sorted_filtered_collected_transitions =
×
674
                  O1trace.sync_thread "sorting_collected_transitions" (fun () ->
675
                      List.sort filtered_collected_transitions
×
676
                        ~compare:
677
                          (Comparable.lift
×
678
                             ~f:(fun (x, _) ->
679
                               Transition_cache.header_with_hash
×
680
                               @@ Envelope.Incoming.data x )
×
681
                             (external_transition_compare
682
                                ~context:(module Context) ) ) )
683
                in
684
                let this_cycle =
×
685
                  { cycle_result = "success"
686
                  ; sync_ledger_time
687
                  ; staged_ledger_data_download_time
688
                  ; staged_ledger_construction_time
689
                  ; local_state_sync_required
690
                  ; local_state_sync_time = Some local_state_sync_time
691
                  }
692
                in
693
                ( this_cycle :: previous_cycles
694
                , (new_frontier, sorted_filtered_collected_transitions) ) )
695
      in
696
      let%map time_elapsed, (cycles, result) = time_deferred (loop []) in
×
697
      [%log info] "Bootstrap completed in $time_elapsed: $bootstrap_stats"
×
698
        ~metadata:
699
          [ ("time_elapsed", time_to_yojson time_elapsed)
×
700
          ; ( "bootstrap_stats"
701
            , `List (List.map ~f:bootstrap_cycle_stats_to_yojson cycles) )
×
702
          ] ;
703
      Mina_metrics.(
×
704
        Gauge.set Bootstrap.bootstrap_time_ms
×
705
          Core.Time.(Span.to_ms @@ time_elapsed)) ;
×
706
      result )
707

708
let%test_module "Bootstrap_controller tests" =
709
  ( module struct
710
    open Pipe_lib
711

712
    let max_frontier_length =
713
      Transition_frontier.global_max_length Genesis_constants.For_unit_tests.t
×
714

715
    let logger = Logger.create ()
×
716

717
    let trust_system =
718
      let s = Trust_system.null () in
719
      don't_wait_for
×
720
        (Pipe_lib.Strict_pipe.Reader.iter
×
721
           (Trust_system.upcall_pipe s)
×
722
           ~f:(const Deferred.unit) ) ;
×
723
      s
×
724

725
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
726

727
    let proof_level = precomputed_values.proof_level
728

729
    let constraint_constants = precomputed_values.constraint_constants
730

731
    let compile_config = Mina_compile_config.For_unit_tests.t
732

733
    module Context = struct
734
      let logger = Logger.create ()
×
735

736
      let precomputed_values = precomputed_values
737

738
      let constraint_constants =
739
        Genesis_constants.For_unit_tests.Constraint_constants.t
740

741
      let consensus_constants = precomputed_values.consensus_constants
742

743
      let compile_config = compile_config
744
    end
745

746
    let verifier =
747
      Async.Thread_safe.block_on_async_exn (fun () ->
×
748
          Verifier.For_tests.default ~constraint_constants ~logger ~proof_level
×
749
            () )
750

751
    module Genesis_ledger = (val precomputed_values.genesis_ledger)
752

753
    let downcast_transition ~sender transition =
754
      let transition =
×
755
        transition |> Mina_block.Validated.remember
×
756
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
757
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
758
      in
759
      Envelope.Incoming.wrap ~data:transition
×
760
        ~sender:(Envelope.Sender.Remote sender)
761

762
    let downcast_breadcrumb ~sender breadcrumb =
763
      downcast_transition ~sender
×
764
        (Transition_frontier.Breadcrumb.validated_transition breadcrumb)
×
765

766
    let make_non_running_bootstrap ~genesis_root ~network =
767
      let transition =
×
768
        genesis_root
769
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
770
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
771
      in
772
      { context = (module Context)
×
773
      ; trust_system
774
      ; verifier
775
      ; best_seen_transition = transition
776
      ; current_root = transition
777
      ; network
778
      ; num_of_root_snarked_ledger_retargeted = 0
779
      }
780

781
    let%test_unit "Bootstrap controller caches all transitions it is passed \
782
                   through the transition_reader" =
783
      let branch_size = (max_frontier_length * 2) + 2 in
×
784
      Quickcheck.test ~trials:1
785
        (let open Quickcheck.Generator.Let_syntax in
786
        (* we only need one node for this test, but we need more than one peer so that mina_networking does not throw an error *)
787
        let%bind fake_network =
788
          Fake_network.Generator.(
789
            gen ~precomputed_values ~verifier ~max_frontier_length
×
790
              ~compile_config [ fresh_peer; fresh_peer ]
791
              ~use_super_catchup:false)
792
        in
793
        let%map make_branch =
794
          Transition_frontier.Breadcrumb.For_tests.gen_seq ~precomputed_values
×
795
            ~verifier
796
            ~accounts_with_secret_keys:(Lazy.force Genesis_ledger.accounts)
×
797
            branch_size
798
        in
799
        let [ me; _ ] = fake_network.peer_networks in
×
800
        let branch =
801
          Async.Thread_safe.block_on_async_exn (fun () ->
802
              make_branch (Transition_frontier.root me.state.frontier) )
×
803
        in
804
        (fake_network, branch))
×
805
        ~f:(fun (fake_network, branch) ->
806
          let [ me; other ] = fake_network.peer_networks in
×
807
          let genesis_root =
808
            Transition_frontier.(
809
              Breadcrumb.validated_transition @@ root me.state.frontier)
×
810
            |> Mina_block.Validated.remember
811
          in
812
          let transition_graph = Transition_cache.create () in
×
813
          let sync_ledger_reader, sync_ledger_writer =
×
814
            Pipe_lib.Strict_pipe.create ~name:"sync_ledger_reader" Synchronous
815
          in
816
          let bootstrap =
×
817
            make_non_running_bootstrap ~genesis_root ~network:me.network
818
          in
819
          let root_sync_ledger =
820
            Sync_ledger.Db.create
821
              (Transition_frontier.root_snarked_ledger me.state.frontier)
×
822
              ~logger ~trust_system
823
          in
824
          Async.Thread_safe.block_on_async_exn (fun () ->
×
825
              let sync_deferred =
×
826
                sync_ledger bootstrap ~root_sync_ledger ~transition_graph
827
                  ~preferred:[] ~sync_ledger_reader
828
                  ~genesis_constants:Genesis_constants.For_unit_tests.t
829
              in
830
              let%bind () =
831
                Deferred.List.iter branch ~f:(fun breadcrumb ->
×
832
                    Strict_pipe.Writer.write sync_ledger_writer
×
833
                      ( `Block
834
                          (downcast_breadcrumb ~sender:other.peer breadcrumb)
×
835
                      , `Valid_cb None ) )
836
              in
837
              Strict_pipe.Writer.close sync_ledger_writer ;
×
838
              sync_deferred ) ;
×
839
          let expected_transitions =
×
840
            List.map branch
841
              ~f:
842
                (Fn.compose
×
843
                   (With_hash.map ~f:Mina_block.header)
844
                   (Fn.compose Mina_block.Validation.block_with_hash
×
845
                      (Fn.compose Mina_block.Validated.remember
×
846
                         Transition_frontier.Breadcrumb.validated_transition ) ) )
847
          in
848
          let saved_transitions =
×
849
            Transition_cache.data transition_graph
×
850
            |> List.map ~f:(fun (x, _) ->
851
                   Transition_cache.header_with_hash @@ Envelope.Incoming.data x )
×
852
          in
853
          let module E = struct
×
854
            module T = struct
855
              type t = Mina_block.Header.t State_hash.With_state_hashes.t
×
856
              [@@deriving sexp]
857

858
              let compare = external_transition_compare ~context:(module Context)
859
            end
860

861
            include Comparable.Make (T)
862
          end in
863
          [%test_result: E.Set.t]
×
864
            (E.Set.of_list saved_transitions)
×
865
            ~expect:(E.Set.of_list expected_transitions) )
×
866

867
    let run_bootstrap ~timeout_duration ~my_net ~transition_reader =
868
      let open Fake_network in
×
869
      let time_controller = Block_time.Controller.basic ~logger in
870
      let persistent_root =
871
        Transition_frontier.persistent_root my_net.state.frontier
872
      in
873
      let persistent_frontier =
×
874
        Transition_frontier.persistent_frontier my_net.state.frontier
875
      in
876
      let initial_root_transition =
×
877
        Transition_frontier.(
878
          Breadcrumb.validated_transition (root my_net.state.frontier))
×
879
      in
880
      let%bind () =
881
        Transition_frontier.close ~loc:__LOC__ my_net.state.frontier
×
882
      in
883
      [%log info] "bootstrap begin" ;
×
884
      Block_time.Timeout.await_exn time_controller ~timeout_duration
×
885
        (run
886
           ~context:(module Context)
887
           ~trust_system ~verifier ~network:my_net.network ~preferred_peers:[]
888
           ~consensus_local_state:my_net.state.consensus_local_state
889
           ~transition_reader ~persistent_root ~persistent_frontier
890
           ~catchup_mode:`Normal ~initial_root_transition )
891

892
    let assert_transitions_increasingly_sorted ~root
893
        (incoming_transitions : Transition_cache.element list) =
894
      let root =
×
895
        Transition_frontier.Breadcrumb.block root |> Mina_block.header
×
896
      in
897
      ignore
×
898
        ( List.fold_result ~init:root incoming_transitions
×
899
            ~f:(fun max_acc incoming_transition ->
900
              let With_hash.{ data = header; _ } =
×
901
                Transition_cache.header_with_hash
902
                  (Envelope.Incoming.data @@ fst incoming_transition)
×
903
              in
904
              let header_len h =
×
905
                Mina_block.Header.protocol_state h
×
906
                |> Protocol_state.consensus_state
×
907
                |> Consensus.Data.Consensus_state.blockchain_length
908
              in
909
              let open Result.Let_syntax in
910
              let%map () =
911
                Result.ok_if_true
×
912
                  Mina_numbers.Length.(header_len max_acc <= header_len header)
×
913
                  ~error:
914
                    (Error.of_string
×
915
                       "The blocks are not sorted in increasing order" )
916
              in
917
              header )
×
918
          |> Or_error.ok_exn
×
919
          : Mina_block.Header.t )
920

921
    let%test_unit "sync with one node after receiving a transition" =
922
      Quickcheck.test ~trials:1
×
923
        Fake_network.Generator.(
924
          gen ~precomputed_values ~verifier ~max_frontier_length
×
925
            ~use_super_catchup:false ~compile_config
926
            [ fresh_peer
927
            ; peer_with_branch
928
                ~frontier_branch_size:((max_frontier_length * 2) + 2)
929
            ])
930
        ~f:(fun fake_network ->
931
          let [ my_net; peer_net ] = fake_network.peer_networks in
×
932
          let transition_reader, transition_writer =
933
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
934
              (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
935
          in
936
          let block =
×
937
            Envelope.Incoming.wrap
938
              ~data:
939
                ( Transition_frontier.best_tip peer_net.state.frontier
×
940
                |> Transition_frontier.Breadcrumb.validated_transition
×
941
                |> Mina_block.Validated.remember
×
942
                |> Mina_block.Validation.reset_frontier_dependencies_validation
×
943
                |> Mina_block.Validation.reset_staged_ledger_diff_validation )
×
944
              ~sender:(Envelope.Sender.Remote peer_net.peer)
945
          in
946
          Pipe_lib.Strict_pipe.Writer.write transition_writer
947
            (`Block block, `Valid_cb None) ;
948
          let new_frontier, sorted_external_transitions =
×
949
            Async.Thread_safe.block_on_async_exn (fun () ->
950
                run_bootstrap
×
951
                  ~timeout_duration:(Block_time.Span.of_ms 30_000L)
×
952
                  ~my_net ~transition_reader )
953
          in
954
          assert_transitions_increasingly_sorted
×
955
            ~root:(Transition_frontier.root new_frontier)
×
956
            sorted_external_transitions ;
957
          [%test_result: Ledger_hash.t]
×
958
            ( Ledger.Db.merkle_root
×
959
            @@ Transition_frontier.root_snarked_ledger new_frontier )
×
960
            ~expect:
961
              ( Ledger.Db.merkle_root
×
962
              @@ Transition_frontier.root_snarked_ledger peer_net.state.frontier
×
963
              ) )
964

965
    let%test_unit "reconstruct staged_ledgers using \
966
                   of_scan_state_and_snarked_ledger" =
967
      Quickcheck.test ~trials:1
×
968
        (Transition_frontier.For_tests.gen ~precomputed_values ~verifier
×
969
           ~max_length:max_frontier_length ~size:max_frontier_length () )
970
        ~f:(fun frontier ->
971
          Thread_safe.block_on_async_exn
×
972
          @@ fun () ->
973
          Deferred.List.iter (Transition_frontier.all_breadcrumbs frontier)
×
974
            ~f:(fun breadcrumb ->
975
              let staged_ledger =
×
976
                Transition_frontier.Breadcrumb.staged_ledger breadcrumb
977
              in
978
              let expected_merkle_root =
×
979
                Staged_ledger.ledger staged_ledger |> Ledger.merkle_root
×
980
              in
981
              let snarked_ledger =
×
982
                Transition_frontier.root_snarked_ledger frontier
×
983
                |> Ledger.of_database
984
              in
985
              let snarked_local_state =
×
986
                Transition_frontier.root frontier
×
987
                |> Transition_frontier.Breadcrumb.protocol_state
×
988
                |> Protocol_state.blockchain_state
×
989
                |> Blockchain_state.snarked_local_state
990
              in
991
              let scan_state = Staged_ledger.scan_state staged_ledger in
×
992
              let get_state hash =
×
993
                match Transition_frontier.find_protocol_state frontier hash with
×
994
                | Some protocol_state ->
×
995
                    Ok protocol_state
996
                | None ->
×
997
                    Or_error.errorf
998
                      !"Protocol state (for scan state transactions) for \
×
999
                        %{sexp:State_hash.t} not found"
1000
                      hash
1001
              in
1002
              let pending_coinbases =
1003
                Staged_ledger.pending_coinbase_collection staged_ledger
1004
              in
1005
              let%map actual_staged_ledger =
1006
                Staged_ledger.of_scan_state_pending_coinbases_and_snarked_ledger
1007
                  ~scan_state ~logger ~verifier ~constraint_constants
1008
                  ~snarked_ledger ~snarked_local_state ~expected_merkle_root
1009
                  ~pending_coinbases ~get_state
1010
                |> Deferred.Or_error.ok_exn
×
1011
              in
1012
              assert (
×
1013
                Staged_ledger_hash.equal
×
1014
                  (Staged_ledger.hash staged_ledger)
×
1015
                  (Staged_ledger.hash actual_staged_ledger) ) ) )
×
1016

1017
    (*
1018
    let%test_unit "if we see a new transition that is better than the \
1019
                   transition that we are syncing from, than we should \
1020
                   retarget our root" =
1021
      Quickcheck.test ~trials:1
1022
        Fake_network.Generator.(
1023
          gen ~max_frontier_length
1024
            [ fresh_peer
1025
            ; peer_with_branch ~frontier_branch_size:max_frontier_length
1026
            ; peer_with_branch
1027
                ~frontier_branch_size:((max_frontier_length * 2) + 2) ])
1028
        ~f:(fun fake_network ->
1029
          let [me; weaker_chain; stronger_chain] =
1030
            fake_network.peer_networks
1031
          in
1032
          let transition_reader, transition_writer =
1033
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
1034
              (Buffered (`Capacity 10, `Overflow Drop_head))
1035
          in
1036
          Envelope.Incoming.wrap
1037
            ~data:
1038
              ( Transition_frontier.best_tip weaker_chain.state.frontier
1039
              |> Transition_frontier.Breadcrumb.validated_transition
1040
              |> Mina_block.Validated.to_initial_validated )
1041
            ~sender:
1042
              (Envelope.Sender.Remote
1043
                 (weaker_chain.peer.host, weaker_chain.peer.peer_id))
1044
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1045
          Envelope.Incoming.wrap
1046
            ~data:
1047
              ( Transition_frontier.best_tip stronger_chain.state.frontier
1048
              |> Transition_frontier.Breadcrumb.validated_transition
1049
              |> Mina_block.Validated.to_initial_validated )
1050
            ~sender:
1051
              (Envelope.Sender.Remote
1052
                 (stronger_chain.peer.host, stronger_chain.peer.peer_id))
1053
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1054
          let new_frontier, sorted_external_transitions =
1055
            Async.Thread_safe.block_on_async_exn (fun () ->
1056
                run_bootstrap
1057
                  ~timeout_duration:(Block_time.Span.of_ms 60_000L)
1058
                  ~my_net:me ~transition_reader )
1059
          in
1060
          assert_transitions_increasingly_sorted
1061
            ~root:(Transition_frontier.root new_frontier)
1062
            sorted_external_transitions ;
1063
          [%test_result: Ledger_hash.t]
1064
            ( Ledger.Db.merkle_root
1065
            @@ Transition_frontier.root_snarked_ledger new_frontier )
1066
            ~expect:
1067
              ( Ledger.Db.merkle_root
1068
              @@ Transition_frontier.root_snarked_ledger
1069
                   stronger_chain.state.frontier ) )
1070
*)
1071
  end )
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc