• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2845

31 Oct 2024 09:52AM UTC coverage: 33.163% (-27.7%) from 60.847%
2845

push

buildkite

web-flow
Merge pull request #15891 from MinaProtocol/dkijania/update_badges_comp

[Compatible] Update badges and logo

22200 of 66943 relevant lines covered (33.16%)

21864.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.55
/src/lib/bootstrap_controller/bootstrap_controller.ml
1
(* Only show stdout for failed inline tests. *)
1✔
2
open Inline_test_quiet_logs
3
open Core
4
open Async
5
open Mina_base
6
module Ledger = Mina_ledger.Ledger
7
module Sync_ledger = Mina_ledger.Sync_ledger
8
open Mina_state
9
open Pipe_lib.Strict_pipe
10
open Network_peer
11
module Transition_cache = Transition_cache
12

13
module type CONTEXT = sig
14
  val logger : Logger.t
15

16
  val precomputed_values : Precomputed_values.t
17

18
  val constraint_constants : Genesis_constants.Constraint_constants.t
19

20
  val consensus_constants : Consensus.Constants.t
21

22
  val compile_config : Mina_compile_config.t
23
end
24

25
type Structured_log_events.t += Bootstrap_complete
26
  [@@deriving register_event { msg = "Bootstrap state: complete." }]
×
27

28
type t =
29
  { context : (module CONTEXT)
30
  ; trust_system : Trust_system.t
31
  ; verifier : Verifier.t
32
  ; mutable best_seen_transition : Mina_block.initial_valid_block
33
  ; mutable current_root : Mina_block.initial_valid_block
34
  ; network : Mina_networking.t
35
  ; mutable num_of_root_snarked_ledger_retargeted : int
36
  }
37

38
type time = Time.Span.t
39

40
let time_to_yojson span =
41
  `String (Printf.sprintf "%f seconds" (Time.Span.to_sec span))
×
42

43
type opt_time = time option
44

45
let opt_time_to_yojson = function
46
  | Some time ->
×
47
      time_to_yojson time
48
  | None ->
×
49
      `Null
50

51
(** An auxiliary data structure for collecting various metrics for boostrap controller. *)
52
type bootstrap_cycle_stats =
×
53
  { cycle_result : string
×
54
  ; sync_ledger_time : time
×
55
  ; staged_ledger_data_download_time : time
×
56
  ; staged_ledger_construction_time : opt_time
×
57
  ; local_state_sync_required : bool
×
58
  ; local_state_sync_time : opt_time
×
59
  }
60
[@@deriving to_yojson]
61

62
let time_deferred deferred =
63
  let start_time = Time.now () in
×
64
  let%map result = deferred in
65
  let end_time = Time.now () in
×
66
  (Time.diff end_time start_time, result)
×
67

68
let worth_getting_root ({ context = (module Context); _ } as t) candidate =
69
  let module Context = struct
×
70
    include Context
71

72
    let logger =
73
      Logger.extend logger
×
74
        [ ( "selection_context"
75
          , `String "Bootstrap_controller.worth_getting_root" )
76
        ]
77
  end in
78
  Consensus.Hooks.equal_select_status `Take
79
  @@ Consensus.Hooks.select
80
       ~context:(module Context)
81
       ~existing:
82
         ( t.best_seen_transition |> Mina_block.Validation.block_with_hash
×
83
         |> With_hash.map ~f:Mina_block.consensus_state )
×
84
       ~candidate
85

86
let received_bad_proof ({ context = (module Context); _ } as t) host e =
87
  let open Context in
×
88
  Trust_system.(
89
    record t.trust_system logger host
90
      Actions.
91
        ( Violated_protocol
92
        , Some
93
            ( "Bad ancestor proof: $error"
94
            , [ ("error", Error_json.error_to_yojson e) ] ) ))
×
95

96
let done_syncing_root root_sync_ledger =
97
  Option.is_some (Sync_ledger.Db.peek_valid_tree root_sync_ledger)
×
98

99
let should_sync ~root_sync_ledger t candidate_state =
100
  (not @@ done_syncing_root root_sync_ledger)
×
101
  && worth_getting_root t candidate_state
×
102

103
(** Update [Synced_ledger]'s target and [best_seen_transition] and [current_root] accordingly. *)
104
let start_sync_job_with_peer ~sender ~root_sync_ledger
105
    ({ context = (module Context); _ } as t) peer_best_tip peer_root =
106
  let open Context in
×
107
  let%bind () =
108
    Trust_system.(
109
      record t.trust_system logger sender
×
110
        Actions.
111
          ( Fulfilled_request
112
          , Some ("Received verified peer root and best tip", []) ))
113
  in
114
  t.best_seen_transition <- peer_best_tip ;
×
115
  t.current_root <- peer_root ;
116
  let blockchain_state =
117
    t.current_root |> Mina_block.Validation.block |> Mina_block.header
×
118
    |> Mina_block.Header.protocol_state |> Protocol_state.blockchain_state
×
119
  in
120
  let expected_staged_ledger_hash =
×
121
    blockchain_state |> Blockchain_state.staged_ledger_hash
122
  in
123
  let snarked_ledger_hash =
×
124
    blockchain_state |> Blockchain_state.snarked_ledger_hash
125
  in
126
  return
×
127
  @@
128
  match
129
    Sync_ledger.Db.new_goal root_sync_ledger
130
      (Frozen_ledger_hash.to_ledger_hash snarked_ledger_hash)
×
131
      ~data:
132
        ( State_hash.With_state_hashes.state_hash
×
133
          @@ Mina_block.Validation.block_with_hash t.current_root
×
134
        , sender
135
        , expected_staged_ledger_hash )
136
      ~equal:(fun (hash1, _, _) (hash2, _, _) -> State_hash.equal hash1 hash2)
×
137
  with
138
  | `New ->
×
139
      t.num_of_root_snarked_ledger_retargeted <-
140
        t.num_of_root_snarked_ledger_retargeted + 1 ;
141
      `Syncing_new_snarked_ledger
142
  | `Update_data ->
×
143
      `Updating_root_transition
144
  | `Repeat ->
×
145
      `Ignored
146

147
let to_consensus_state h =
148
  Mina_block.Header.protocol_state h |> Protocol_state.consensus_state
×
149

150
(** For each transition, this function would compare it with the existing one.
151
    If the incoming transition is better, then download the merkle list from
152
    that transition to its root and verify it. If we get a better root than
153
    the existing one, then reset the Sync_ledger's target by calling
154
    [start_sync_job_with_peer] function. *)
155
let on_transition ({ context = (module Context); _ } as t) ~sender
156
    ~root_sync_ledger ~genesis_constants candidate_header =
157
  let open Context in
×
158
  let candidate_consensus_state =
159
    With_hash.map ~f:to_consensus_state candidate_header
160
  in
161
  if not @@ should_sync ~root_sync_ledger t candidate_consensus_state then
×
162
    Deferred.return `Ignored
×
163
  else
164
    match%bind
165
      Mina_networking.get_ancestry t.network sender.Peer.peer_id
×
166
        (With_hash.map_hash candidate_consensus_state
×
167
           ~f:State_hash.State_hashes.state_hash )
168
    with
169
    | Error e ->
×
170
        [%log error]
×
171
          ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
172
          !"Could not get the proof of the root transition from the network: \
173
            $error" ;
174
        Deferred.return `Ignored
×
175
    | Ok peer_root_with_proof -> (
×
176
        match%bind
177
          Sync_handler.Root.verify
×
178
            ~context:(module Context)
179
            ~verifier:t.verifier ~genesis_constants candidate_consensus_state
180
            peer_root_with_proof.data
181
        with
182
        | Ok (`Root root, `Best_tip best_tip) ->
×
183
            if done_syncing_root root_sync_ledger then return `Ignored
×
184
            else
185
              start_sync_job_with_peer ~sender ~root_sync_ledger t best_tip root
×
186
        | Error e ->
×
187
            return (received_bad_proof t sender e |> Fn.const `Ignored) )
×
188

189
(** A helper function that wraps the calls to Sync_ledger and iterate through
190
    incoming transitions, add those to the transition_cache and calls
191
    [on_transition] function. *)
192
let sync_ledger ({ context = (module Context); _ } as t) ~preferred
193
    ~root_sync_ledger ~transition_graph ~sync_ledger_reader ~genesis_constants =
194
  let open Context in
×
195
  let query_reader = Sync_ledger.Db.query_reader root_sync_ledger in
196
  let response_writer = Sync_ledger.Db.answer_writer root_sync_ledger in
×
197
  Mina_networking.glue_sync_ledger ~preferred t.network query_reader
×
198
    response_writer ;
199
  Reader.iter sync_ledger_reader ~f:(fun (b_or_h, `Valid_cb vc) ->
×
200
      let header_with_hash, sender, transition_cache_element =
×
201
        match b_or_h with
202
        | `Block b_env ->
×
203
            ( Envelope.Incoming.data b_env
×
204
              |> Mina_block.Validation.block_with_hash
×
205
              |> With_hash.map ~f:Mina_block.header
×
206
            , Envelope.Incoming.remote_sender_exn b_env
×
207
            , Envelope.Incoming.map ~f:(fun x -> `Block x) b_env )
×
208
        | `Header h_env ->
×
209
            ( Envelope.Incoming.data h_env
×
210
              |> Mina_block.Validation.header_with_hash
×
211
            , Envelope.Incoming.remote_sender_exn h_env
×
212
            , Envelope.Incoming.map ~f:(fun x -> `Header x) h_env )
×
213
      in
214
      let previous_state_hash =
215
        With_hash.data header_with_hash
×
216
        |> Mina_block.Header.protocol_state
×
217
        |> Protocol_state.previous_state_hash
218
      in
219
      Transition_cache.add transition_graph ~parent:previous_state_hash
×
220
        (transition_cache_element, vc) ;
221
      (* TODO: Efficiently limiting the number of green threads in #1337 *)
222
      if
×
223
        worth_getting_root t
224
          (With_hash.map ~f:to_consensus_state header_with_hash)
×
225
      then (
×
226
        [%log trace] "Added the transition from sync_ledger_reader into cache"
×
227
          ~metadata:
228
            [ ( "state_hash"
229
              , State_hash.to_yojson
×
230
                  (State_hash.With_state_hashes.state_hash header_with_hash) )
×
231
            ; ( "header"
232
              , Mina_block.Header.to_yojson (With_hash.data header_with_hash) )
×
233
            ] ;
234

235
        Deferred.ignore_m
×
236
        @@ on_transition t ~sender ~root_sync_ledger ~genesis_constants
×
237
             header_with_hash )
238
      else Deferred.unit )
×
239

240
let external_transition_compare ~context:(module Context : CONTEXT) =
241
  let get_consensus_state =
×
242
    Fn.compose Protocol_state.consensus_state Mina_block.Header.protocol_state
243
  in
244
  Comparable.lift
×
245
    (fun existing candidate ->
246
      (* To prevent the logger to spam a lot of messsages, the logger input is set to null *)
247
      if
×
248
        State_hash.equal
249
          (State_hash.With_state_hashes.state_hash existing)
×
250
          (State_hash.With_state_hashes.state_hash candidate)
×
251
      then 0
×
252
      else if
×
253
        Consensus.Hooks.equal_select_status `Keep
254
        @@ Consensus.Hooks.select ~context:(module Context) ~existing ~candidate
255
      then -1
×
256
      else 1 )
×
257
    ~f:(With_hash.map ~f:get_consensus_state)
258

259
(** The entry point function for bootstrap controller. When bootstrap finished
260
    it would return a transition frontier with the root breadcrumb and a list
261
    of transitions collected during bootstrap.
262

263
    Bootstrap controller would do the following steps to contrust the
264
    transition frontier:
265
    1. Download the root snarked_ledger.
266
    2. Download the scan state and pending coinbases.
267
    3. Construct the staged ledger from the snarked ledger, scan state and
268
       pending coinbases.
269
    4. Synchronize the consensus local state if necessary.
270
    5. Close the old frontier and reload a new one from disk.
271
 *)
272
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
273
    ~consensus_local_state ~transition_reader ~preferred_peers ~persistent_root
274
    ~persistent_frontier ~initial_root_transition ~catchup_mode =
275
  let open Context in
×
276
  O1trace.thread "bootstrap" (fun () ->
277
      let genesis_constants =
×
278
        Precomputed_values.genesis_constants precomputed_values
279
      in
280
      let constraint_constants = precomputed_values.constraint_constants in
×
281
      let rec loop previous_cycles =
282
        let sync_ledger_pipe = "sync ledger pipe" in
×
283
        let sync_ledger_reader, sync_ledger_writer =
284
          create ~name:sync_ledger_pipe
285
            (Buffered
286
               ( `Capacity 50
287
               , `Overflow
288
                   (Drop_head
289
                      (fun (b_or_h, `Valid_cb valid_cb) ->
290
                        let hash =
×
291
                          match b_or_h with
292
                          | `Block b_env ->
×
293
                              Envelope.Incoming.data b_env
×
294
                              |> Mina_block.Validation.block_with_hash
×
295
                              |> With_hash.hash
×
296
                          | `Header h_env ->
×
297
                              Envelope.Incoming.data h_env
×
298
                              |> Mina_block.Validation.header_with_hash
×
299
                              |> With_hash.hash
×
300
                        in
301
                        Mina_metrics.(
302
                          Counter.inc_one
×
303
                            Pipe.Drop_on_overflow.bootstrap_sync_ledger) ;
304
                        Mina_block.handle_dropped_transition ?valid_cb hash
305
                          ~pipe_name:sync_ledger_pipe ~logger ) ) ) )
306
        in
307
        don't_wait_for
×
308
          (transfer_while_writer_alive transition_reader sync_ledger_writer
×
309
             ~f:Fn.id ) ;
310
        let initial_root_transition =
×
311
          initial_root_transition |> Mina_block.Validated.remember
×
312
          |> Mina_block.Validation.reset_frontier_dependencies_validation
×
313
          |> Mina_block.Validation.reset_staged_ledger_diff_validation
314
        in
315
        let t =
×
316
          { network
317
          ; context = (module Context)
318
          ; trust_system
319
          ; verifier
320
          ; best_seen_transition = initial_root_transition
321
          ; current_root = initial_root_transition
322
          ; num_of_root_snarked_ledger_retargeted = 0
323
          }
324
        in
325
        let transition_graph = Transition_cache.create () in
326
        let temp_persistent_root_instance =
×
327
          Transition_frontier.Persistent_root.create_instance_exn
328
            persistent_root
329
        in
330
        let temp_snarked_ledger =
×
331
          Transition_frontier.Persistent_root.Instance.snarked_ledger
332
            temp_persistent_root_instance
333
        in
334
        (* step 1. download snarked_ledger *)
335
        let%bind sync_ledger_time, (hash, sender, expected_staged_ledger_hash) =
336
          time_deferred
×
337
            (let root_sync_ledger =
338
               Sync_ledger.Db.create temp_snarked_ledger ~logger ~trust_system
339
             in
340
             don't_wait_for
×
341
               (sync_ledger t ~preferred:preferred_peers ~root_sync_ledger
×
342
                  ~transition_graph ~sync_ledger_reader ~genesis_constants ) ;
343
             (* We ignore the resulting ledger returned here since it will always
344
                * be the same as the ledger we started with because we are syncing
345
                * a db ledger. *)
346
             let%map _, data = Sync_ledger.Db.valid_tree root_sync_ledger in
×
347
             Sync_ledger.Db.destroy root_sync_ledger ;
×
348
             data )
×
349
        in
350
        Mina_metrics.(
×
351
          Counter.inc Bootstrap.root_snarked_ledger_sync_ms
×
352
            Time.Span.(to_ms sync_ledger_time)) ;
×
353
        Mina_metrics.(
354
          Gauge.set Bootstrap.num_of_root_snarked_ledger_retargeted
×
355
            (Float.of_int t.num_of_root_snarked_ledger_retargeted)) ;
×
356
        (* step 2. Download scan state and pending coinbases. *)
357
        let%bind ( staged_ledger_data_download_time
358
                 , staged_ledger_construction_time
359
                 , staged_ledger_aux_result ) =
360
          let%bind ( staged_ledger_data_download_time
361
                   , staged_ledger_data_download_result ) =
362
            time_deferred
×
363
              (Mina_networking
364
               .get_staged_ledger_aux_and_pending_coinbases_at_hash t.network
×
365
                 sender.peer_id hash )
366
          in
367
          match staged_ledger_data_download_result with
×
368
          | Error err ->
×
369
              Deferred.return (staged_ledger_data_download_time, None, Error err)
370
          | Ok
×
371
              ( scan_state
372
              , expected_merkle_root
373
              , pending_coinbases
374
              , protocol_states ) -> (
375
              let%map staged_ledger_construction_result =
376
                O1trace.thread "construct_root_staged_ledger" (fun () ->
×
377
                    let open Deferred.Or_error.Let_syntax in
×
378
                    let received_staged_ledger_hash =
379
                      Staged_ledger_hash.of_aux_ledger_and_coinbase_hash
380
                        (Staged_ledger.Scan_state.hash scan_state)
×
381
                        expected_merkle_root pending_coinbases
382
                    in
383
                    [%log debug]
×
384
                      ~metadata:
385
                        [ ( "expected_staged_ledger_hash"
386
                          , Staged_ledger_hash.to_yojson
×
387
                              expected_staged_ledger_hash )
388
                        ; ( "received_staged_ledger_hash"
389
                          , Staged_ledger_hash.to_yojson
×
390
                              received_staged_ledger_hash )
391
                        ]
392
                      "Comparing $expected_staged_ledger_hash to \
393
                       $received_staged_ledger_hash" ;
394
                    let%bind new_root =
395
                      t.current_root
396
                      |> Mina_block.Validation
397
                         .skip_frontier_dependencies_validation
×
398
                           `This_block_belongs_to_a_detached_subtree
399
                      |> Mina_block.Validation.validate_staged_ledger_hash
×
400
                           (`Staged_ledger_already_materialized
401
                             received_staged_ledger_hash )
402
                      |> Result.map_error ~f:(fun _ ->
×
403
                             Error.of_string
×
404
                               "received faulty scan state from peer" )
405
                      |> Deferred.return
×
406
                    in
407
                    let protocol_states =
×
408
                      List.map protocol_states
409
                        ~f:(With_hash.of_data ~hash_data:Protocol_state.hashes)
410
                    in
411
                    let%bind protocol_states =
412
                      Staged_ledger.Scan_state.check_required_protocol_states
×
413
                        scan_state ~protocol_states
414
                      |> Deferred.return
×
415
                    in
416
                    let protocol_states_map =
×
417
                      protocol_states
418
                      |> List.map ~f:(fun ps ->
×
419
                             (State_hash.With_state_hashes.state_hash ps, ps) )
×
420
                      |> State_hash.Map.of_alist_exn
421
                    in
422
                    let get_state hash =
×
423
                      match Map.find protocol_states_map hash with
×
424
                      | None ->
×
425
                          let new_state_hash =
426
                            State_hash.With_state_hashes.state_hash
427
                              (fst new_root)
×
428
                          in
429
                          [%log error]
×
430
                            ~metadata:
431
                              [ ("new_root", State_hash.to_yojson new_state_hash)
×
432
                              ; ("state_hash", State_hash.to_yojson hash)
×
433
                              ]
434
                            "Protocol state (for scan state transactions) for \
435
                             $state_hash not found when bootstrapping to the \
436
                             new root $new_root" ;
437
                          Or_error.errorf
×
438
                            !"Protocol state (for scan state transactions) for \
×
439
                              %{sexp:State_hash.t} not found when \
440
                              bootstrapping to the new root \
441
                              %{sexp:State_hash.t}"
442
                            hash new_state_hash
443
                      | Some protocol_state ->
×
444
                          Ok (With_hash.data protocol_state)
×
445
                    in
446
                    (* step 3. Construct staged ledger from snarked ledger, scan state
447
                       and pending coinbases. *)
448
                    (* Construct the staged ledger before constructing the transition
449
                     * frontier in order to verify the scan state we received.
450
                     * TODO: reorganize the code to avoid doing this twice (#3480) *)
451
                    let open Deferred.Let_syntax in
452
                    let%map staged_ledger_construction_time, construction_result
453
                        =
454
                      time_deferred
×
455
                        (let open Deferred.Let_syntax in
456
                        let temp_mask =
457
                          Ledger.of_database temp_snarked_ledger
458
                        in
459
                        let%map result =
460
                          Staged_ledger
461
                          .of_scan_state_pending_coinbases_and_snarked_ledger
462
                            ~logger
463
                            ~snarked_local_state:
464
                              Mina_block.(
465
                                t.current_root |> Validation.block |> header
×
466
                                |> Header.protocol_state
×
467
                                |> Protocol_state.blockchain_state
×
468
                                |> Blockchain_state.snarked_local_state)
×
469
                            ~verifier ~constraint_constants ~scan_state
470
                            ~snarked_ledger:temp_mask ~expected_merkle_root
471
                            ~pending_coinbases ~get_state
472
                        in
473
                        ignore
×
474
                          ( Ledger.Maskable.unregister_mask_exn ~loc:__LOC__
×
475
                              temp_mask
476
                            : Ledger.unattached_mask ) ;
477
                        Result.map result
478
                          ~f:
479
                            (const
×
480
                               ( scan_state
481
                               , pending_coinbases
482
                               , new_root
483
                               , protocol_states ) ))
484
                    in
485
                    Ok (staged_ledger_construction_time, construction_result) )
×
486
              in
487
              match staged_ledger_construction_result with
×
488
              | Error err ->
×
489
                  (staged_ledger_data_download_time, None, Error err)
490
              | Ok (staged_ledger_construction_time, result) ->
×
491
                  ( staged_ledger_data_download_time
492
                  , Some staged_ledger_construction_time
493
                  , result ) )
494
        in
495
        Transition_frontier.Persistent_root.Instance.close
×
496
          temp_persistent_root_instance ;
497
        match staged_ledger_aux_result with
×
498
        | Error e ->
×
499
            let%bind () =
500
              Trust_system.(
501
                record t.trust_system logger sender
×
502
                  Actions.
503
                    ( Outgoing_connection_error
504
                    , Some
505
                        ( "Can't find scan state from the peer or received \
506
                           faulty scan state from the peer."
507
                        , [] ) ))
508
            in
509
            [%log error]
×
510
              ~metadata:
511
                [ ("error", Error_json.error_to_yojson e)
×
512
                ; ("state_hash", State_hash.to_yojson hash)
×
513
                ; ( "expected_staged_ledger_hash"
514
                  , Staged_ledger_hash.to_yojson expected_staged_ledger_hash )
×
515
                ]
516
              "Failed to find scan state for the transition with hash \
517
               $state_hash from the peer or received faulty scan state: \
518
               $error. Retry bootstrap" ;
519
            Writer.close sync_ledger_writer ;
×
520
            let this_cycle =
×
521
              { cycle_result = "failed to download and construct scan state"
522
              ; sync_ledger_time
523
              ; staged_ledger_data_download_time
524
              ; staged_ledger_construction_time
525
              ; local_state_sync_required = false
526
              ; local_state_sync_time = None
527
              }
528
            in
529
            loop (this_cycle :: previous_cycles)
530
        | Ok (scan_state, pending_coinbase, new_root, protocol_states) -> (
×
531
            let%bind () =
532
              Trust_system.(
533
                record t.trust_system logger sender
×
534
                  Actions.
535
                    ( Fulfilled_request
536
                    , Some ("Received valid scan state from peer", []) ))
537
            in
538
            let best_seen_block_with_hash, _ = t.best_seen_transition in
×
539
            let consensus_state =
540
              With_hash.data best_seen_block_with_hash
×
541
              |> Mina_block.header |> Mina_block.Header.protocol_state
×
542
              |> Protocol_state.consensus_state
543
            in
544
            (* step 4. Synchronize consensus local state if necessary *)
545
            let%bind ( local_state_sync_time
546
                     , (local_state_sync_required, local_state_sync_result) ) =
547
              time_deferred
×
548
                ( match
549
                    Consensus.Hooks.required_local_state_sync
550
                      ~constants:precomputed_values.consensus_constants
551
                      ~consensus_state ~local_state:consensus_local_state
552
                  with
553
                | None ->
×
554
                    [%log debug]
×
555
                      ~metadata:
556
                        [ ( "local_state"
557
                          , Consensus.Data.Local_state.to_yojson
×
558
                              consensus_local_state )
559
                        ; ( "consensus_state"
560
                          , Consensus.Data.Consensus_state.Value.to_yojson
×
561
                              consensus_state )
562
                        ]
563
                      "Not synchronizing consensus local state" ;
564
                    Deferred.return (false, Or_error.return ())
×
565
                | Some sync_jobs ->
×
566
                    [%log info] "Synchronizing consensus local state" ;
×
567
                    let%map result =
568
                      Consensus.Hooks.sync_local_state
×
569
                        ~context:(module Context)
570
                        ~local_state:consensus_local_state ~trust_system
571
                        ~glue_sync_ledger:
572
                          (Mina_networking.glue_sync_ledger t.network)
×
573
                        sync_jobs
574
                    in
575
                    (true, result) )
×
576
            in
577
            match local_state_sync_result with
×
578
            | Error e ->
×
579
                [%log error]
×
580
                  ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
581
                  "Local state sync failed: $error. Retry bootstrap" ;
582
                Writer.close sync_ledger_writer ;
×
583
                let this_cycle =
×
584
                  { cycle_result = "failed to synchronize local state"
585
                  ; sync_ledger_time
586
                  ; staged_ledger_data_download_time
587
                  ; staged_ledger_construction_time
588
                  ; local_state_sync_required
589
                  ; local_state_sync_time = Some local_state_sync_time
590
                  }
591
                in
592
                loop (this_cycle :: previous_cycles)
593
            | Ok () ->
×
594
                (* step 5. Close the old frontier and reload a new one from disk. *)
595
                let new_root_data : Transition_frontier.Root_data.Limited.t =
596
                  Transition_frontier.Root_data.Limited.create
597
                    ~transition:(Mina_block.Validated.lift new_root)
×
598
                    ~scan_state ~pending_coinbase ~protocol_states
599
                in
600
                let%bind () =
601
                  Transition_frontier.Persistent_frontier.reset_database_exn
×
602
                    persistent_frontier ~root_data:new_root_data
603
                    ~genesis_state_hash:
604
                      (State_hash.With_state_hashes.state_hash
×
605
                         precomputed_values.protocol_state_with_hashes )
606
                in
607
                (* TODO: lazy load db in persistent root to avoid unecessary opens like this *)
608
                Transition_frontier.Persistent_root.(
×
609
                  with_instance_exn persistent_root ~f:(fun instance ->
×
610
                      Instance.set_root_state_hash instance
×
611
                      @@ Mina_block.Validated.state_hash
×
612
                      @@ Mina_block.Validated.lift new_root )) ;
×
613
                let%map new_frontier =
614
                  let fail msg =
615
                    failwith
×
616
                      ( "failed to initialize transition frontier after \
617
                         bootstrapping: " ^ msg )
618
                  in
619
                  Transition_frontier.load
×
620
                    ~context:(module Context)
621
                    ~retry_with_fresh_db:false ~verifier ~consensus_local_state
622
                    ~persistent_root ~persistent_frontier ~catchup_mode ()
623
                  >>| function
×
624
                  | Ok frontier ->
×
625
                      frontier
626
                  | Error (`Failure msg) ->
×
627
                      fail msg
628
                  | Error `Bootstrap_required ->
×
629
                      fail
630
                        "bootstrap still required (indicates logical error in \
631
                         code)"
632
                  | Error `Persistent_frontier_malformed ->
×
633
                      fail "persistent frontier was malformed"
634
                  | Error `Snarked_ledger_mismatch ->
×
635
                      fail
636
                        "this should not happen, because we just reset the \
637
                         snarked_ledger"
638
                in
639
                [%str_log info] Bootstrap_complete ;
×
640
                let collected_transitions =
×
641
                  Transition_cache.data transition_graph
642
                in
643
                let logger =
×
644
                  Logger.extend logger
645
                    [ ( "context"
646
                      , `String "Filter collected transitions in bootstrap" )
647
                    ]
648
                in
649
                let root_consensus_state =
×
650
                  Transition_frontier.(
651
                    Breadcrumb.consensus_state_with_hashes (root new_frontier))
×
652
                in
653
                let filtered_collected_transitions =
654
                  List.filter collected_transitions
655
                    ~f:(fun (incoming_transition, _) ->
656
                      let transition =
×
657
                        Envelope.Incoming.data incoming_transition
×
658
                        |> Transition_cache.header_with_hash
659
                      in
660
                      Consensus.Hooks.equal_select_status `Take
×
661
                      @@ Consensus.Hooks.select
662
                           ~context:(module Context)
663
                           ~existing:root_consensus_state
664
                           ~candidate:
665
                             (With_hash.map
×
666
                                ~f:
667
                                  (Fn.compose Protocol_state.consensus_state
×
668
                                     Mina_block.Header.protocol_state )
669
                                transition ) )
670
                in
671
                [%log debug] "Sorting filtered transitions by consensus state"
×
672
                  ~metadata:[] ;
673
                let sorted_filtered_collected_transitions =
×
674
                  O1trace.sync_thread "sorting_collected_transitions" (fun () ->
675
                      List.sort filtered_collected_transitions
×
676
                        ~compare:
677
                          (Comparable.lift
×
678
                             ~f:(fun (x, _) ->
679
                               Transition_cache.header_with_hash
×
680
                               @@ Envelope.Incoming.data x )
×
681
                             (external_transition_compare
682
                                ~context:(module Context) ) ) )
683
                in
684
                let this_cycle =
×
685
                  { cycle_result = "success"
686
                  ; sync_ledger_time
687
                  ; staged_ledger_data_download_time
688
                  ; staged_ledger_construction_time
689
                  ; local_state_sync_required
690
                  ; local_state_sync_time = Some local_state_sync_time
691
                  }
692
                in
693
                ( this_cycle :: previous_cycles
694
                , (new_frontier, sorted_filtered_collected_transitions) ) )
695
      in
696
      let%map time_elapsed, (cycles, result) = time_deferred (loop []) in
×
697
      [%log info] "Bootstrap completed in $time_elapsed: $bootstrap_stats"
×
698
        ~metadata:
699
          [ ("time_elapsed", time_to_yojson time_elapsed)
×
700
          ; ( "bootstrap_stats"
701
            , `List (List.map ~f:bootstrap_cycle_stats_to_yojson cycles) )
×
702
          ] ;
703
      Mina_metrics.(
×
704
        Gauge.set Bootstrap.bootstrap_time_ms
×
705
          Core.Time.(Span.to_ms @@ time_elapsed)) ;
×
706
      result )
707

708
let%test_module "Bootstrap_controller tests" =
709
  ( module struct
710
    open Pipe_lib
711

712
    let max_frontier_length =
713
      Transition_frontier.global_max_length Genesis_constants.For_unit_tests.t
×
714

715
    let logger = Logger.create ()
×
716

717
    let trust_system =
718
      let s = Trust_system.null () in
719
      don't_wait_for
×
720
        (Pipe_lib.Strict_pipe.Reader.iter
×
721
           (Trust_system.upcall_pipe s)
×
722
           ~f:(const Deferred.unit) ) ;
×
723
      s
×
724

725
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
726

727
    let proof_level = precomputed_values.proof_level
728

729
    let constraint_constants = precomputed_values.constraint_constants
730

731
    let compile_config = Mina_compile_config.For_unit_tests.t
732

733
    module Context = struct
734
      let logger = Logger.create ()
×
735

736
      let precomputed_values = precomputed_values
737

738
      let constraint_constants =
739
        Genesis_constants.For_unit_tests.Constraint_constants.t
740

741
      let consensus_constants = precomputed_values.consensus_constants
742

743
      let compile_config = compile_config
744
    end
745

746
    let verifier =
747
      Async.Thread_safe.block_on_async_exn (fun () ->
×
748
          Verifier.create ~logger ~proof_level ~constraint_constants
×
749
            ~conf_dir:None
750
            ~pids:(Child_processes.Termination.create_pid_table ())
×
751
            ~commit_id:"not specified for unit tests" () )
752

753
    module Genesis_ledger = (val precomputed_values.genesis_ledger)
754

755
    let downcast_transition ~sender transition =
756
      let transition =
×
757
        transition |> Mina_block.Validated.remember
×
758
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
759
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
760
      in
761
      Envelope.Incoming.wrap ~data:transition
×
762
        ~sender:(Envelope.Sender.Remote sender)
763

764
    let downcast_breadcrumb ~sender breadcrumb =
765
      downcast_transition ~sender
×
766
        (Transition_frontier.Breadcrumb.validated_transition breadcrumb)
×
767

768
    let make_non_running_bootstrap ~genesis_root ~network =
769
      let transition =
×
770
        genesis_root
771
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
772
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
773
      in
774
      { context = (module Context)
×
775
      ; trust_system
776
      ; verifier
777
      ; best_seen_transition = transition
778
      ; current_root = transition
779
      ; network
780
      ; num_of_root_snarked_ledger_retargeted = 0
781
      }
782

783
    let%test_unit "Bootstrap controller caches all transitions it is passed \
784
                   through the transition_reader" =
785
      let branch_size = (max_frontier_length * 2) + 2 in
×
786
      Quickcheck.test ~trials:1
787
        (let open Quickcheck.Generator.Let_syntax in
788
        (* we only need one node for this test, but we need more than one peer so that mina_networking does not throw an error *)
789
        let%bind fake_network =
790
          Fake_network.Generator.(
791
            gen ~precomputed_values ~verifier ~max_frontier_length
×
792
              ~compile_config [ fresh_peer; fresh_peer ]
793
              ~use_super_catchup:false)
794
        in
795
        let%map make_branch =
796
          Transition_frontier.Breadcrumb.For_tests.gen_seq ~precomputed_values
×
797
            ~verifier
798
            ~accounts_with_secret_keys:(Lazy.force Genesis_ledger.accounts)
×
799
            branch_size
800
        in
801
        let [ me; _ ] = fake_network.peer_networks in
×
802
        let branch =
803
          Async.Thread_safe.block_on_async_exn (fun () ->
804
              make_branch (Transition_frontier.root me.state.frontier) )
×
805
        in
806
        (fake_network, branch))
×
807
        ~f:(fun (fake_network, branch) ->
808
          let [ me; other ] = fake_network.peer_networks in
×
809
          let genesis_root =
810
            Transition_frontier.(
811
              Breadcrumb.validated_transition @@ root me.state.frontier)
×
812
            |> Mina_block.Validated.remember
813
          in
814
          let transition_graph = Transition_cache.create () in
×
815
          let sync_ledger_reader, sync_ledger_writer =
×
816
            Pipe_lib.Strict_pipe.create ~name:"sync_ledger_reader" Synchronous
817
          in
818
          let bootstrap =
×
819
            make_non_running_bootstrap ~genesis_root ~network:me.network
820
          in
821
          let root_sync_ledger =
822
            Sync_ledger.Db.create
823
              (Transition_frontier.root_snarked_ledger me.state.frontier)
×
824
              ~logger ~trust_system
825
          in
826
          Async.Thread_safe.block_on_async_exn (fun () ->
×
827
              let sync_deferred =
×
828
                sync_ledger bootstrap ~root_sync_ledger ~transition_graph
829
                  ~preferred:[] ~sync_ledger_reader
830
                  ~genesis_constants:Genesis_constants.For_unit_tests.t
831
              in
832
              let%bind () =
833
                Deferred.List.iter branch ~f:(fun breadcrumb ->
×
834
                    Strict_pipe.Writer.write sync_ledger_writer
×
835
                      ( `Block
836
                          (downcast_breadcrumb ~sender:other.peer breadcrumb)
×
837
                      , `Valid_cb None ) )
838
              in
839
              Strict_pipe.Writer.close sync_ledger_writer ;
×
840
              sync_deferred ) ;
×
841
          let expected_transitions =
×
842
            List.map branch
843
              ~f:
844
                (Fn.compose
×
845
                   (With_hash.map ~f:Mina_block.header)
846
                   (Fn.compose Mina_block.Validation.block_with_hash
×
847
                      (Fn.compose Mina_block.Validated.remember
×
848
                         Transition_frontier.Breadcrumb.validated_transition ) ) )
849
          in
850
          let saved_transitions =
×
851
            Transition_cache.data transition_graph
×
852
            |> List.map ~f:(fun (x, _) ->
853
                   Transition_cache.header_with_hash @@ Envelope.Incoming.data x )
×
854
          in
855
          let module E = struct
×
856
            module T = struct
857
              type t = Mina_block.Header.t State_hash.With_state_hashes.t
×
858
              [@@deriving sexp]
859

860
              let compare = external_transition_compare ~context:(module Context)
861
            end
862

863
            include Comparable.Make (T)
864
          end in
865
          [%test_result: E.Set.t]
×
866
            (E.Set.of_list saved_transitions)
×
867
            ~expect:(E.Set.of_list expected_transitions) )
×
868

869
    let run_bootstrap ~timeout_duration ~my_net ~transition_reader =
870
      let open Fake_network in
×
871
      let time_controller = Block_time.Controller.basic ~logger in
872
      let persistent_root =
873
        Transition_frontier.persistent_root my_net.state.frontier
874
      in
875
      let persistent_frontier =
×
876
        Transition_frontier.persistent_frontier my_net.state.frontier
877
      in
878
      let initial_root_transition =
×
879
        Transition_frontier.(
880
          Breadcrumb.validated_transition (root my_net.state.frontier))
×
881
      in
882
      let%bind () =
883
        Transition_frontier.close ~loc:__LOC__ my_net.state.frontier
×
884
      in
885
      [%log info] "bootstrap begin" ;
×
886
      Block_time.Timeout.await_exn time_controller ~timeout_duration
×
887
        (run
888
           ~context:(module Context)
889
           ~trust_system ~verifier ~network:my_net.network ~preferred_peers:[]
890
           ~consensus_local_state:my_net.state.consensus_local_state
891
           ~transition_reader ~persistent_root ~persistent_frontier
892
           ~catchup_mode:`Normal ~initial_root_transition )
893

894
    let assert_transitions_increasingly_sorted ~root
895
        (incoming_transitions : Transition_cache.element list) =
896
      let root =
×
897
        Transition_frontier.Breadcrumb.block root |> Mina_block.header
×
898
      in
899
      ignore
×
900
        ( List.fold_result ~init:root incoming_transitions
×
901
            ~f:(fun max_acc incoming_transition ->
902
              let With_hash.{ data = header; _ } =
×
903
                Transition_cache.header_with_hash
904
                  (Envelope.Incoming.data @@ fst incoming_transition)
×
905
              in
906
              let header_len h =
×
907
                Mina_block.Header.protocol_state h
×
908
                |> Protocol_state.consensus_state
×
909
                |> Consensus.Data.Consensus_state.blockchain_length
910
              in
911
              let open Result.Let_syntax in
912
              let%map () =
913
                Result.ok_if_true
×
914
                  Mina_numbers.Length.(header_len max_acc <= header_len header)
×
915
                  ~error:
916
                    (Error.of_string
×
917
                       "The blocks are not sorted in increasing order" )
918
              in
919
              header )
×
920
          |> Or_error.ok_exn
×
921
          : Mina_block.Header.t )
922

923
    let%test_unit "sync with one node after receiving a transition" =
924
      Quickcheck.test ~trials:1
×
925
        Fake_network.Generator.(
926
          gen ~precomputed_values ~verifier ~max_frontier_length
×
927
            ~use_super_catchup:false ~compile_config
928
            [ fresh_peer
929
            ; peer_with_branch
930
                ~frontier_branch_size:((max_frontier_length * 2) + 2)
931
            ])
932
        ~f:(fun fake_network ->
933
          let [ my_net; peer_net ] = fake_network.peer_networks in
×
934
          let transition_reader, transition_writer =
935
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
936
              (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
937
          in
938
          let block =
×
939
            Envelope.Incoming.wrap
940
              ~data:
941
                ( Transition_frontier.best_tip peer_net.state.frontier
×
942
                |> Transition_frontier.Breadcrumb.validated_transition
×
943
                |> Mina_block.Validated.remember
×
944
                |> Mina_block.Validation.reset_frontier_dependencies_validation
×
945
                |> Mina_block.Validation.reset_staged_ledger_diff_validation )
×
946
              ~sender:(Envelope.Sender.Remote peer_net.peer)
947
          in
948
          Pipe_lib.Strict_pipe.Writer.write transition_writer
949
            (`Block block, `Valid_cb None) ;
950
          let new_frontier, sorted_external_transitions =
×
951
            Async.Thread_safe.block_on_async_exn (fun () ->
952
                run_bootstrap
×
953
                  ~timeout_duration:(Block_time.Span.of_ms 30_000L)
×
954
                  ~my_net ~transition_reader )
955
          in
956
          assert_transitions_increasingly_sorted
×
957
            ~root:(Transition_frontier.root new_frontier)
×
958
            sorted_external_transitions ;
959
          [%test_result: Ledger_hash.t]
×
960
            ( Ledger.Db.merkle_root
×
961
            @@ Transition_frontier.root_snarked_ledger new_frontier )
×
962
            ~expect:
963
              ( Ledger.Db.merkle_root
×
964
              @@ Transition_frontier.root_snarked_ledger peer_net.state.frontier
×
965
              ) )
966

967
    let%test_unit "reconstruct staged_ledgers using \
968
                   of_scan_state_and_snarked_ledger" =
969
      Quickcheck.test ~trials:1
×
970
        (Transition_frontier.For_tests.gen ~precomputed_values ~verifier
×
971
           ~max_length:max_frontier_length ~size:max_frontier_length () )
972
        ~f:(fun frontier ->
973
          Thread_safe.block_on_async_exn
×
974
          @@ fun () ->
975
          Deferred.List.iter (Transition_frontier.all_breadcrumbs frontier)
×
976
            ~f:(fun breadcrumb ->
977
              let staged_ledger =
×
978
                Transition_frontier.Breadcrumb.staged_ledger breadcrumb
979
              in
980
              let expected_merkle_root =
×
981
                Staged_ledger.ledger staged_ledger |> Ledger.merkle_root
×
982
              in
983
              let snarked_ledger =
×
984
                Transition_frontier.root_snarked_ledger frontier
×
985
                |> Ledger.of_database
986
              in
987
              let snarked_local_state =
×
988
                Transition_frontier.root frontier
×
989
                |> Transition_frontier.Breadcrumb.protocol_state
×
990
                |> Protocol_state.blockchain_state
×
991
                |> Blockchain_state.snarked_local_state
992
              in
993
              let scan_state = Staged_ledger.scan_state staged_ledger in
×
994
              let get_state hash =
×
995
                match Transition_frontier.find_protocol_state frontier hash with
×
996
                | Some protocol_state ->
×
997
                    Ok protocol_state
998
                | None ->
×
999
                    Or_error.errorf
1000
                      !"Protocol state (for scan state transactions) for \
×
1001
                        %{sexp:State_hash.t} not found"
1002
                      hash
1003
              in
1004
              let pending_coinbases =
1005
                Staged_ledger.pending_coinbase_collection staged_ledger
1006
              in
1007
              let%map actual_staged_ledger =
1008
                Staged_ledger.of_scan_state_pending_coinbases_and_snarked_ledger
1009
                  ~scan_state ~logger ~verifier ~constraint_constants
1010
                  ~snarked_ledger ~snarked_local_state ~expected_merkle_root
1011
                  ~pending_coinbases ~get_state
1012
                |> Deferred.Or_error.ok_exn
×
1013
              in
1014
              assert (
×
1015
                Staged_ledger_hash.equal
×
1016
                  (Staged_ledger.hash staged_ledger)
×
1017
                  (Staged_ledger.hash actual_staged_ledger) ) ) )
×
1018

1019
    (*
1020
    let%test_unit "if we see a new transition that is better than the \
1021
                   transition that we are syncing from, than we should \
1022
                   retarget our root" =
1023
      Quickcheck.test ~trials:1
1024
        Fake_network.Generator.(
1025
          gen ~max_frontier_length
1026
            [ fresh_peer
1027
            ; peer_with_branch ~frontier_branch_size:max_frontier_length
1028
            ; peer_with_branch
1029
                ~frontier_branch_size:((max_frontier_length * 2) + 2) ])
1030
        ~f:(fun fake_network ->
1031
          let [me; weaker_chain; stronger_chain] =
1032
            fake_network.peer_networks
1033
          in
1034
          let transition_reader, transition_writer =
1035
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
1036
              (Buffered (`Capacity 10, `Overflow Drop_head))
1037
          in
1038
          Envelope.Incoming.wrap
1039
            ~data:
1040
              ( Transition_frontier.best_tip weaker_chain.state.frontier
1041
              |> Transition_frontier.Breadcrumb.validated_transition
1042
              |> Mina_block.Validated.to_initial_validated )
1043
            ~sender:
1044
              (Envelope.Sender.Remote
1045
                 (weaker_chain.peer.host, weaker_chain.peer.peer_id))
1046
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1047
          Envelope.Incoming.wrap
1048
            ~data:
1049
              ( Transition_frontier.best_tip stronger_chain.state.frontier
1050
              |> Transition_frontier.Breadcrumb.validated_transition
1051
              |> Mina_block.Validated.to_initial_validated )
1052
            ~sender:
1053
              (Envelope.Sender.Remote
1054
                 (stronger_chain.peer.host, stronger_chain.peer.peer_id))
1055
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1056
          let new_frontier, sorted_external_transitions =
1057
            Async.Thread_safe.block_on_async_exn (fun () ->
1058
                run_bootstrap
1059
                  ~timeout_duration:(Block_time.Span.of_ms 60_000L)
1060
                  ~my_net:me ~transition_reader )
1061
          in
1062
          assert_transitions_increasingly_sorted
1063
            ~root:(Transition_frontier.root new_frontier)
1064
            sorted_external_transitions ;
1065
          [%test_result: Ledger_hash.t]
1066
            ( Ledger.Db.merkle_root
1067
            @@ Transition_frontier.root_snarked_ledger new_frontier )
1068
            ~expect:
1069
              ( Ledger.Db.merkle_root
1070
              @@ Transition_frontier.root_snarked_ledger
1071
                   stronger_chain.state.frontier ) )
1072
*)
1073
  end )
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc