• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2956

21 Nov 2024 07:57PM UTC coverage: 33.184% (+0.1%) from 33.077%
2956

Pull #16380

buildkite

dkijania
Merge branch 'compatible' into dkijania/merge/compatible_to_develop_21_11_24
Pull Request #16380: merge compatible to develop 21 11 24

263 of 457 new or added lines in 33 files covered. (57.55%)

11 existing lines in 9 files now uncovered.

22259 of 67078 relevant lines covered (33.18%)

10199.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.55
/src/lib/bootstrap_controller/bootstrap_controller.ml
1
(* Only show stdout for failed inline tests. *)
1✔
2
open Core
3
open Async
4
open Mina_base
5
module Ledger = Mina_ledger.Ledger
6
module Sync_ledger = Mina_ledger.Sync_ledger
7
open Mina_state
8
open Pipe_lib.Strict_pipe
9
open Network_peer
10
module Transition_cache = Transition_cache
11

12
module type CONTEXT = sig
13
  val logger : Logger.t
14

15
  val precomputed_values : Precomputed_values.t
16

17
  val constraint_constants : Genesis_constants.Constraint_constants.t
18

19
  val consensus_constants : Consensus.Constants.t
20

21
  val compile_config : Mina_compile_config.t
22
end
23

24
type Structured_log_events.t += Bootstrap_complete
25
  [@@deriving register_event { msg = "Bootstrap state: complete." }]
×
26

27
type t =
28
  { context : (module CONTEXT)
29
  ; trust_system : Trust_system.t
30
  ; verifier : Verifier.t
31
  ; mutable best_seen_transition : Mina_block.initial_valid_block
32
  ; mutable current_root : Mina_block.initial_valid_block
33
  ; network : Mina_networking.t
34
  ; mutable num_of_root_snarked_ledger_retargeted : int
35
  }
36

37
type time = Time.Span.t
38

39
let time_to_yojson span =
40
  `String (Printf.sprintf "%f seconds" (Time.Span.to_sec span))
×
41

42
type opt_time = time option
43

44
let opt_time_to_yojson = function
45
  | Some time ->
×
46
      time_to_yojson time
47
  | None ->
×
48
      `Null
49

50
(** An auxiliary data structure for collecting various metrics for boostrap controller. *)
51
type bootstrap_cycle_stats =
×
52
  { cycle_result : string
×
53
  ; sync_ledger_time : time
×
54
  ; staged_ledger_data_download_time : time
×
55
  ; staged_ledger_construction_time : opt_time
×
56
  ; local_state_sync_required : bool
×
57
  ; local_state_sync_time : opt_time
×
58
  }
59
[@@deriving to_yojson]
60

61
let time_deferred deferred =
62
  let start_time = Time.now () in
×
63
  let%map result = deferred in
64
  let end_time = Time.now () in
×
65
  (Time.diff end_time start_time, result)
×
66

67
let worth_getting_root ({ context = (module Context); _ } as t) candidate =
68
  let module Context = struct
×
69
    include Context
70

71
    let logger =
72
      Logger.extend logger
×
73
        [ ( "selection_context"
74
          , `String "Bootstrap_controller.worth_getting_root" )
75
        ]
76
  end in
77
  Consensus.Hooks.equal_select_status `Take
78
  @@ Consensus.Hooks.select
79
       ~context:(module Context)
80
       ~existing:
81
         ( t.best_seen_transition |> Mina_block.Validation.block_with_hash
×
82
         |> With_hash.map ~f:Mina_block.consensus_state )
×
83
       ~candidate
84

85
let received_bad_proof ({ context = (module Context); _ } as t) host e =
86
  let open Context in
×
87
  Trust_system.(
88
    record t.trust_system logger host
89
      Actions.
90
        ( Violated_protocol
91
        , Some
92
            ( "Bad ancestor proof: $error"
93
            , [ ("error", Error_json.error_to_yojson e) ] ) ))
×
94

95
let done_syncing_root root_sync_ledger =
96
  Option.is_some (Sync_ledger.Db.peek_valid_tree root_sync_ledger)
×
97

98
let should_sync ~root_sync_ledger t candidate_state =
99
  (not @@ done_syncing_root root_sync_ledger)
×
100
  && worth_getting_root t candidate_state
×
101

102
(** Update [Synced_ledger]'s target and [best_seen_transition] and [current_root] accordingly. *)
103
let start_sync_job_with_peer ~sender ~root_sync_ledger
104
    ({ context = (module Context); _ } as t) peer_best_tip peer_root =
105
  let open Context in
×
106
  let%bind () =
107
    Trust_system.(
108
      record t.trust_system logger sender
×
109
        Actions.
110
          ( Fulfilled_request
111
          , Some ("Received verified peer root and best tip", []) ))
112
  in
113
  t.best_seen_transition <- peer_best_tip ;
×
114
  t.current_root <- peer_root ;
115
  let blockchain_state =
116
    t.current_root |> Mina_block.Validation.block |> Mina_block.header
×
117
    |> Mina_block.Header.protocol_state |> Protocol_state.blockchain_state
×
118
  in
119
  let expected_staged_ledger_hash =
×
120
    blockchain_state |> Blockchain_state.staged_ledger_hash
121
  in
122
  let snarked_ledger_hash =
×
123
    blockchain_state |> Blockchain_state.snarked_ledger_hash
124
  in
125
  return
×
126
  @@
127
  match
128
    Sync_ledger.Db.new_goal root_sync_ledger
129
      (Frozen_ledger_hash.to_ledger_hash snarked_ledger_hash)
×
130
      ~data:
131
        ( State_hash.With_state_hashes.state_hash
×
132
          @@ Mina_block.Validation.block_with_hash t.current_root
×
133
        , sender
134
        , expected_staged_ledger_hash )
135
      ~equal:(fun (hash1, _, _) (hash2, _, _) -> State_hash.equal hash1 hash2)
×
136
  with
137
  | `New ->
×
138
      t.num_of_root_snarked_ledger_retargeted <-
139
        t.num_of_root_snarked_ledger_retargeted + 1 ;
140
      `Syncing_new_snarked_ledger
141
  | `Update_data ->
×
142
      `Updating_root_transition
143
  | `Repeat ->
×
144
      `Ignored
145

146
let to_consensus_state h =
147
  Mina_block.Header.protocol_state h |> Protocol_state.consensus_state
×
148

149
(** For each transition, this function would compare it with the existing one.
150
    If the incoming transition is better, then download the merkle list from
151
    that transition to its root and verify it. If we get a better root than
152
    the existing one, then reset the Sync_ledger's target by calling
153
    [start_sync_job_with_peer] function. *)
154
let on_transition ({ context = (module Context); _ } as t) ~sender
155
    ~root_sync_ledger ~genesis_constants candidate_header =
156
  let open Context in
×
157
  let candidate_consensus_state =
158
    With_hash.map ~f:to_consensus_state candidate_header
159
  in
160
  if not @@ should_sync ~root_sync_ledger t candidate_consensus_state then
×
161
    Deferred.return `Ignored
×
162
  else
163
    match%bind
164
      Mina_networking.get_ancestry t.network sender.Peer.peer_id
×
165
        (With_hash.map_hash candidate_consensus_state
×
166
           ~f:State_hash.State_hashes.state_hash )
167
    with
168
    | Error e ->
×
169
        [%log error]
×
170
          ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
171
          !"Could not get the proof of the root transition from the network: \
172
            $error" ;
173
        Deferred.return `Ignored
×
174
    | Ok peer_root_with_proof -> (
×
175
        match%bind
176
          Sync_handler.Root.verify
×
177
            ~context:(module Context)
178
            ~verifier:t.verifier ~genesis_constants candidate_consensus_state
179
            peer_root_with_proof.data
180
        with
181
        | Ok (`Root root, `Best_tip best_tip) ->
×
182
            if done_syncing_root root_sync_ledger then return `Ignored
×
183
            else
184
              start_sync_job_with_peer ~sender ~root_sync_ledger t best_tip root
×
185
        | Error e ->
×
186
            return (received_bad_proof t sender e |> Fn.const `Ignored) )
×
187

188
(** A helper function that wraps the calls to Sync_ledger and iterate through
189
    incoming transitions, add those to the transition_cache and calls
190
    [on_transition] function. *)
191
let sync_ledger ({ context = (module Context); _ } as t) ~preferred
192
    ~root_sync_ledger ~transition_graph ~sync_ledger_reader ~genesis_constants =
193
  let open Context in
×
194
  let query_reader = Sync_ledger.Db.query_reader root_sync_ledger in
195
  let response_writer = Sync_ledger.Db.answer_writer root_sync_ledger in
×
196
  Mina_networking.glue_sync_ledger ~preferred t.network query_reader
×
197
    response_writer ;
198
  Reader.iter sync_ledger_reader ~f:(fun (b_or_h, `Valid_cb vc) ->
×
199
      let header_with_hash, sender, transition_cache_element =
×
200
        match b_or_h with
201
        | `Block b_env ->
×
202
            ( Envelope.Incoming.data b_env
×
203
              |> Mina_block.Validation.block_with_hash
×
204
              |> With_hash.map ~f:Mina_block.header
×
205
            , Envelope.Incoming.remote_sender_exn b_env
×
206
            , Envelope.Incoming.map ~f:(fun x -> `Block x) b_env )
×
207
        | `Header h_env ->
×
208
            ( Envelope.Incoming.data h_env
×
209
              |> Mina_block.Validation.header_with_hash
×
210
            , Envelope.Incoming.remote_sender_exn h_env
×
211
            , Envelope.Incoming.map ~f:(fun x -> `Header x) h_env )
×
212
      in
213
      let previous_state_hash =
214
        With_hash.data header_with_hash
×
215
        |> Mina_block.Header.protocol_state
×
216
        |> Protocol_state.previous_state_hash
217
      in
218
      Transition_cache.add transition_graph ~parent:previous_state_hash
×
219
        (transition_cache_element, vc) ;
220
      (* TODO: Efficiently limiting the number of green threads in #1337 *)
221
      if
×
222
        worth_getting_root t
223
          (With_hash.map ~f:to_consensus_state header_with_hash)
×
224
      then (
×
225
        [%log trace] "Added the transition from sync_ledger_reader into cache"
×
226
          ~metadata:
227
            [ ( "state_hash"
228
              , State_hash.to_yojson
×
229
                  (State_hash.With_state_hashes.state_hash header_with_hash) )
×
230
            ; ( "header"
231
              , Mina_block.Header.to_yojson (With_hash.data header_with_hash) )
×
232
            ] ;
233

234
        Deferred.ignore_m
×
235
        @@ on_transition t ~sender ~root_sync_ledger ~genesis_constants
×
236
             header_with_hash )
237
      else Deferred.unit )
×
238

239
let external_transition_compare ~context:(module Context : CONTEXT) =
240
  let get_consensus_state =
×
241
    Fn.compose Protocol_state.consensus_state Mina_block.Header.protocol_state
242
  in
243
  Comparable.lift
×
244
    (fun existing candidate ->
245
      (* To prevent the logger to spam a lot of messsages, the logger input is set to null *)
246
      if
×
247
        State_hash.equal
248
          (State_hash.With_state_hashes.state_hash existing)
×
249
          (State_hash.With_state_hashes.state_hash candidate)
×
250
      then 0
×
251
      else if
×
252
        Consensus.Hooks.equal_select_status `Keep
253
        @@ Consensus.Hooks.select ~context:(module Context) ~existing ~candidate
254
      then -1
×
255
      else 1 )
×
256
    ~f:(With_hash.map ~f:get_consensus_state)
257

258
(** The entry point function for bootstrap controller. When bootstrap finished
259
    it would return a transition frontier with the root breadcrumb and a list
260
    of transitions collected during bootstrap.
261

262
    Bootstrap controller would do the following steps to contrust the
263
    transition frontier:
264
    1. Download the root snarked_ledger.
265
    2. Download the scan state and pending coinbases.
266
    3. Construct the staged ledger from the snarked ledger, scan state and
267
       pending coinbases.
268
    4. Synchronize the consensus local state if necessary.
269
    5. Close the old frontier and reload a new one from disk.
270
 *)
271
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
272
    ~consensus_local_state ~transition_reader ~preferred_peers ~persistent_root
273
    ~persistent_frontier ~initial_root_transition ~catchup_mode =
274
  let open Context in
×
275
  O1trace.thread "bootstrap" (fun () ->
276
      let genesis_constants =
×
277
        Precomputed_values.genesis_constants precomputed_values
278
      in
279
      let constraint_constants = precomputed_values.constraint_constants in
×
280
      let rec loop previous_cycles =
281
        let sync_ledger_pipe = "sync ledger pipe" in
×
282
        let sync_ledger_reader, sync_ledger_writer =
283
          create ~name:sync_ledger_pipe
284
            (Buffered
285
               ( `Capacity 50
286
               , `Overflow
287
                   (Drop_head
288
                      (fun (b_or_h, `Valid_cb valid_cb) ->
289
                        let hash =
×
290
                          match b_or_h with
291
                          | `Block b_env ->
×
292
                              Envelope.Incoming.data b_env
×
293
                              |> Mina_block.Validation.block_with_hash
×
294
                              |> With_hash.hash
×
295
                          | `Header h_env ->
×
296
                              Envelope.Incoming.data h_env
×
297
                              |> Mina_block.Validation.header_with_hash
×
298
                              |> With_hash.hash
×
299
                        in
300
                        Mina_metrics.(
301
                          Counter.inc_one
×
302
                            Pipe.Drop_on_overflow.bootstrap_sync_ledger) ;
303
                        Mina_block.handle_dropped_transition ?valid_cb hash
304
                          ~pipe_name:sync_ledger_pipe ~logger ) ) ) )
305
        in
306
        don't_wait_for
×
307
          (transfer_while_writer_alive transition_reader sync_ledger_writer
×
308
             ~f:Fn.id ) ;
309
        let initial_root_transition =
×
310
          initial_root_transition |> Mina_block.Validated.remember
×
311
          |> Mina_block.Validation.reset_frontier_dependencies_validation
×
312
          |> Mina_block.Validation.reset_staged_ledger_diff_validation
313
        in
314
        let t =
×
315
          { network
316
          ; context = (module Context)
317
          ; trust_system
318
          ; verifier
319
          ; best_seen_transition = initial_root_transition
320
          ; current_root = initial_root_transition
321
          ; num_of_root_snarked_ledger_retargeted = 0
322
          }
323
        in
324
        let transition_graph = Transition_cache.create () in
325
        let temp_persistent_root_instance =
×
326
          Transition_frontier.Persistent_root.create_instance_exn
327
            persistent_root
328
        in
329
        let temp_snarked_ledger =
×
330
          Transition_frontier.Persistent_root.Instance.snarked_ledger
331
            temp_persistent_root_instance
332
        in
333
        (* step 1. download snarked_ledger *)
334
        let%bind sync_ledger_time, (hash, sender, expected_staged_ledger_hash) =
335
          time_deferred
×
336
            (let root_sync_ledger =
337
               Sync_ledger.Db.create temp_snarked_ledger ~logger ~trust_system
338
             in
339
             don't_wait_for
×
340
               (sync_ledger t ~preferred:preferred_peers ~root_sync_ledger
×
341
                  ~transition_graph ~sync_ledger_reader ~genesis_constants ) ;
342
             (* We ignore the resulting ledger returned here since it will always
343
                * be the same as the ledger we started with because we are syncing
344
                * a db ledger. *)
345
             let%map _, data = Sync_ledger.Db.valid_tree root_sync_ledger in
×
346
             Sync_ledger.Db.destroy root_sync_ledger ;
×
347
             data )
×
348
        in
349
        Mina_metrics.(
×
350
          Counter.inc Bootstrap.root_snarked_ledger_sync_ms
×
351
            Time.Span.(to_ms sync_ledger_time)) ;
×
352
        Mina_metrics.(
353
          Gauge.set Bootstrap.num_of_root_snarked_ledger_retargeted
×
354
            (Float.of_int t.num_of_root_snarked_ledger_retargeted)) ;
×
355
        (* step 2. Download scan state and pending coinbases. *)
356
        let%bind ( staged_ledger_data_download_time
357
                 , staged_ledger_construction_time
358
                 , staged_ledger_aux_result ) =
359
          let%bind ( staged_ledger_data_download_time
360
                   , staged_ledger_data_download_result ) =
361
            time_deferred
×
362
              (Mina_networking
363
               .get_staged_ledger_aux_and_pending_coinbases_at_hash t.network
×
364
                 sender.peer_id hash )
365
          in
366
          match staged_ledger_data_download_result with
×
367
          | Error err ->
×
368
              Deferred.return (staged_ledger_data_download_time, None, Error err)
369
          | Ok
×
370
              ( scan_state
371
              , expected_merkle_root
372
              , pending_coinbases
373
              , protocol_states ) -> (
374
              let%map staged_ledger_construction_result =
375
                O1trace.thread "construct_root_staged_ledger" (fun () ->
×
376
                    let open Deferred.Or_error.Let_syntax in
×
377
                    let received_staged_ledger_hash =
378
                      Staged_ledger_hash.of_aux_ledger_and_coinbase_hash
379
                        (Staged_ledger.Scan_state.hash scan_state)
×
380
                        expected_merkle_root pending_coinbases
381
                    in
382
                    [%log debug]
×
383
                      ~metadata:
384
                        [ ( "expected_staged_ledger_hash"
385
                          , Staged_ledger_hash.to_yojson
×
386
                              expected_staged_ledger_hash )
387
                        ; ( "received_staged_ledger_hash"
388
                          , Staged_ledger_hash.to_yojson
×
389
                              received_staged_ledger_hash )
390
                        ]
391
                      "Comparing $expected_staged_ledger_hash to \
392
                       $received_staged_ledger_hash" ;
393
                    let%bind new_root =
394
                      t.current_root
395
                      |> Mina_block.Validation
396
                         .skip_frontier_dependencies_validation
×
397
                           `This_block_belongs_to_a_detached_subtree
398
                      |> Mina_block.Validation.validate_staged_ledger_hash
×
399
                           (`Staged_ledger_already_materialized
400
                             received_staged_ledger_hash )
401
                      |> Result.map_error ~f:(fun _ ->
×
402
                             Error.of_string
×
403
                               "received faulty scan state from peer" )
404
                      |> Deferred.return
×
405
                    in
406
                    let protocol_states =
×
407
                      List.map protocol_states
408
                        ~f:(With_hash.of_data ~hash_data:Protocol_state.hashes)
409
                    in
410
                    let%bind protocol_states =
411
                      Staged_ledger.Scan_state.check_required_protocol_states
×
412
                        scan_state ~protocol_states
413
                      |> Deferred.return
×
414
                    in
415
                    let protocol_states_map =
×
416
                      protocol_states
417
                      |> List.map ~f:(fun ps ->
×
418
                             (State_hash.With_state_hashes.state_hash ps, ps) )
×
419
                      |> State_hash.Map.of_alist_exn
420
                    in
421
                    let get_state hash =
×
422
                      match Map.find protocol_states_map hash with
×
423
                      | None ->
×
424
                          let new_state_hash =
425
                            State_hash.With_state_hashes.state_hash
426
                              (fst new_root)
×
427
                          in
428
                          [%log error]
×
429
                            ~metadata:
430
                              [ ("new_root", State_hash.to_yojson new_state_hash)
×
431
                              ; ("state_hash", State_hash.to_yojson hash)
×
432
                              ]
433
                            "Protocol state (for scan state transactions) for \
434
                             $state_hash not found when bootstrapping to the \
435
                             new root $new_root" ;
436
                          Or_error.errorf
×
437
                            !"Protocol state (for scan state transactions) for \
×
438
                              %{sexp:State_hash.t} not found when \
439
                              bootstrapping to the new root \
440
                              %{sexp:State_hash.t}"
441
                            hash new_state_hash
442
                      | Some protocol_state ->
×
443
                          Ok (With_hash.data protocol_state)
×
444
                    in
445
                    (* step 3. Construct staged ledger from snarked ledger, scan state
446
                       and pending coinbases. *)
447
                    (* Construct the staged ledger before constructing the transition
448
                     * frontier in order to verify the scan state we received.
449
                     * TODO: reorganize the code to avoid doing this twice (#3480) *)
450
                    let open Deferred.Let_syntax in
451
                    let%map staged_ledger_construction_time, construction_result
452
                        =
453
                      time_deferred
×
454
                        (let open Deferred.Let_syntax in
455
                        let temp_mask =
456
                          Ledger.of_database temp_snarked_ledger
457
                        in
458
                        let%map result =
459
                          Staged_ledger
460
                          .of_scan_state_pending_coinbases_and_snarked_ledger
461
                            ~logger
462
                            ~snarked_local_state:
463
                              Mina_block.(
464
                                t.current_root |> Validation.block |> header
×
465
                                |> Header.protocol_state
×
466
                                |> Protocol_state.blockchain_state
×
467
                                |> Blockchain_state.snarked_local_state)
×
468
                            ~verifier ~constraint_constants ~scan_state
469
                            ~snarked_ledger:temp_mask ~expected_merkle_root
470
                            ~pending_coinbases ~get_state
471
                        in
472
                        ignore
×
473
                          ( Ledger.Maskable.unregister_mask_exn ~loc:__LOC__
×
474
                              temp_mask
475
                            : Ledger.unattached_mask ) ;
476
                        Result.map result
477
                          ~f:
478
                            (const
×
479
                               ( scan_state
480
                               , pending_coinbases
481
                               , new_root
482
                               , protocol_states ) ))
483
                    in
484
                    Ok (staged_ledger_construction_time, construction_result) )
×
485
              in
486
              match staged_ledger_construction_result with
×
487
              | Error err ->
×
488
                  (staged_ledger_data_download_time, None, Error err)
489
              | Ok (staged_ledger_construction_time, result) ->
×
490
                  ( staged_ledger_data_download_time
491
                  , Some staged_ledger_construction_time
492
                  , result ) )
493
        in
494
        Transition_frontier.Persistent_root.Instance.close
×
495
          temp_persistent_root_instance ;
496
        match staged_ledger_aux_result with
×
497
        | Error e ->
×
498
            let%bind () =
499
              Trust_system.(
500
                record t.trust_system logger sender
×
501
                  Actions.
502
                    ( Outgoing_connection_error
503
                    , Some
504
                        ( "Can't find scan state from the peer or received \
505
                           faulty scan state from the peer."
506
                        , [] ) ))
507
            in
508
            [%log error]
×
509
              ~metadata:
510
                [ ("error", Error_json.error_to_yojson e)
×
511
                ; ("state_hash", State_hash.to_yojson hash)
×
512
                ; ( "expected_staged_ledger_hash"
513
                  , Staged_ledger_hash.to_yojson expected_staged_ledger_hash )
×
514
                ]
515
              "Failed to find scan state for the transition with hash \
516
               $state_hash from the peer or received faulty scan state: \
517
               $error. Retry bootstrap" ;
518
            Writer.close sync_ledger_writer ;
×
519
            let this_cycle =
×
520
              { cycle_result = "failed to download and construct scan state"
521
              ; sync_ledger_time
522
              ; staged_ledger_data_download_time
523
              ; staged_ledger_construction_time
524
              ; local_state_sync_required = false
525
              ; local_state_sync_time = None
526
              }
527
            in
528
            loop (this_cycle :: previous_cycles)
529
        | Ok (scan_state, pending_coinbase, new_root, protocol_states) -> (
×
530
            let%bind () =
531
              Trust_system.(
532
                record t.trust_system logger sender
×
533
                  Actions.
534
                    ( Fulfilled_request
535
                    , Some ("Received valid scan state from peer", []) ))
536
            in
537
            let best_seen_block_with_hash, _ = t.best_seen_transition in
×
538
            let consensus_state =
539
              With_hash.data best_seen_block_with_hash
×
540
              |> Mina_block.header |> Mina_block.Header.protocol_state
×
541
              |> Protocol_state.consensus_state
542
            in
543
            (* step 4. Synchronize consensus local state if necessary *)
544
            let%bind ( local_state_sync_time
545
                     , (local_state_sync_required, local_state_sync_result) ) =
546
              time_deferred
×
547
                ( match
548
                    Consensus.Hooks.required_local_state_sync
549
                      ~constants:precomputed_values.consensus_constants
550
                      ~consensus_state ~local_state:consensus_local_state
551
                  with
552
                | None ->
×
553
                    [%log debug]
×
554
                      ~metadata:
555
                        [ ( "local_state"
556
                          , Consensus.Data.Local_state.to_yojson
×
557
                              consensus_local_state )
558
                        ; ( "consensus_state"
559
                          , Consensus.Data.Consensus_state.Value.to_yojson
×
560
                              consensus_state )
561
                        ]
562
                      "Not synchronizing consensus local state" ;
563
                    Deferred.return (false, Or_error.return ())
×
564
                | Some sync_jobs ->
×
565
                    [%log info] "Synchronizing consensus local state" ;
×
566
                    let%map result =
567
                      Consensus.Hooks.sync_local_state
×
568
                        ~context:(module Context)
569
                        ~local_state:consensus_local_state ~trust_system
570
                        ~glue_sync_ledger:
571
                          (Mina_networking.glue_sync_ledger t.network)
×
572
                        sync_jobs
573
                    in
574
                    (true, result) )
×
575
            in
576
            match local_state_sync_result with
×
577
            | Error e ->
×
578
                [%log error]
×
579
                  ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
580
                  "Local state sync failed: $error. Retry bootstrap" ;
581
                Writer.close sync_ledger_writer ;
×
582
                let this_cycle =
×
583
                  { cycle_result = "failed to synchronize local state"
584
                  ; sync_ledger_time
585
                  ; staged_ledger_data_download_time
586
                  ; staged_ledger_construction_time
587
                  ; local_state_sync_required
588
                  ; local_state_sync_time = Some local_state_sync_time
589
                  }
590
                in
591
                loop (this_cycle :: previous_cycles)
592
            | Ok () ->
×
593
                (* step 5. Close the old frontier and reload a new one from disk. *)
594
                let new_root_data : Transition_frontier.Root_data.Limited.t =
595
                  Transition_frontier.Root_data.Limited.create
596
                    ~transition:(Mina_block.Validated.lift new_root)
×
597
                    ~scan_state ~pending_coinbase ~protocol_states
598
                in
599
                let%bind () =
600
                  Transition_frontier.Persistent_frontier.reset_database_exn
×
601
                    persistent_frontier ~root_data:new_root_data
602
                    ~genesis_state_hash:
603
                      (State_hash.With_state_hashes.state_hash
×
604
                         precomputed_values.protocol_state_with_hashes )
605
                in
606
                (* TODO: lazy load db in persistent root to avoid unecessary opens like this *)
607
                Transition_frontier.Persistent_root.(
×
608
                  with_instance_exn persistent_root ~f:(fun instance ->
×
609
                      Instance.set_root_state_hash instance
×
610
                      @@ Mina_block.Validated.state_hash
×
611
                      @@ Mina_block.Validated.lift new_root )) ;
×
612
                let%map new_frontier =
613
                  let fail msg =
614
                    failwith
×
615
                      ( "failed to initialize transition frontier after \
616
                         bootstrapping: " ^ msg )
617
                  in
618
                  Transition_frontier.load
×
619
                    ~context:(module Context)
620
                    ~retry_with_fresh_db:false ~verifier ~consensus_local_state
621
                    ~persistent_root ~persistent_frontier ~catchup_mode ()
622
                  >>| function
×
623
                  | Ok frontier ->
×
624
                      frontier
625
                  | Error (`Failure msg) ->
×
626
                      fail msg
627
                  | Error `Bootstrap_required ->
×
628
                      fail
629
                        "bootstrap still required (indicates logical error in \
630
                         code)"
631
                  | Error `Persistent_frontier_malformed ->
×
632
                      fail "persistent frontier was malformed"
633
                  | Error `Snarked_ledger_mismatch ->
×
634
                      fail
635
                        "this should not happen, because we just reset the \
636
                         snarked_ledger"
637
                in
638
                [%str_log info] Bootstrap_complete ;
×
639
                let collected_transitions =
×
640
                  Transition_cache.data transition_graph
641
                in
642
                let logger =
×
643
                  Logger.extend logger
644
                    [ ( "context"
645
                      , `String "Filter collected transitions in bootstrap" )
646
                    ]
647
                in
648
                let root_consensus_state =
×
649
                  Transition_frontier.(
650
                    Breadcrumb.consensus_state_with_hashes (root new_frontier))
×
651
                in
652
                let filtered_collected_transitions =
653
                  List.filter collected_transitions
654
                    ~f:(fun (incoming_transition, _) ->
655
                      let transition =
×
656
                        Envelope.Incoming.data incoming_transition
×
657
                        |> Transition_cache.header_with_hash
658
                      in
659
                      Consensus.Hooks.equal_select_status `Take
×
660
                      @@ Consensus.Hooks.select
661
                           ~context:(module Context)
662
                           ~existing:root_consensus_state
663
                           ~candidate:
664
                             (With_hash.map
×
665
                                ~f:
666
                                  (Fn.compose Protocol_state.consensus_state
×
667
                                     Mina_block.Header.protocol_state )
668
                                transition ) )
669
                in
670
                [%log debug] "Sorting filtered transitions by consensus state"
×
671
                  ~metadata:[] ;
672
                let sorted_filtered_collected_transitions =
×
673
                  O1trace.sync_thread "sorting_collected_transitions" (fun () ->
674
                      List.sort filtered_collected_transitions
×
675
                        ~compare:
676
                          (Comparable.lift
×
677
                             ~f:(fun (x, _) ->
678
                               Transition_cache.header_with_hash
×
679
                               @@ Envelope.Incoming.data x )
×
680
                             (external_transition_compare
681
                                ~context:(module Context) ) ) )
682
                in
683
                let this_cycle =
×
684
                  { cycle_result = "success"
685
                  ; sync_ledger_time
686
                  ; staged_ledger_data_download_time
687
                  ; staged_ledger_construction_time
688
                  ; local_state_sync_required
689
                  ; local_state_sync_time = Some local_state_sync_time
690
                  }
691
                in
692
                ( this_cycle :: previous_cycles
693
                , (new_frontier, sorted_filtered_collected_transitions) ) )
694
      in
695
      let%map time_elapsed, (cycles, result) = time_deferred (loop []) in
×
696
      [%log info] "Bootstrap completed in $time_elapsed: $bootstrap_stats"
×
697
        ~metadata:
698
          [ ("time_elapsed", time_to_yojson time_elapsed)
×
699
          ; ( "bootstrap_stats"
700
            , `List (List.map ~f:bootstrap_cycle_stats_to_yojson cycles) )
×
701
          ] ;
702
      Mina_metrics.(
×
703
        Gauge.set Bootstrap.bootstrap_time_ms
×
704
          Core.Time.(Span.to_ms @@ time_elapsed)) ;
×
705
      result )
706

707
let%test_module "Bootstrap_controller tests" =
708
  ( module struct
709
    open Pipe_lib
710

711
    let max_frontier_length =
712
      Transition_frontier.global_max_length Genesis_constants.For_unit_tests.t
×
713

NEW
714
    let logger = Logger.null ()
×
715

716
    let () =
717
      (* Disable log messages from best_tip_diff logger. *)
NEW
718
      Logger.Consumer_registry.register ~commit_id:Mina_version.commit_id
×
NEW
719
        ~id:Logger.Logger_id.best_tip_diff ~processor:(Logger.Processor.raw ())
×
720
        ~transport:
NEW
721
          (Logger.Transport.create
×
722
             ( module struct
723
               type t = unit
724

NEW
725
               let transport () _ = ()
×
726
             end )
727
             () )
728
        ()
729

730
    let trust_system =
731
      let s = Trust_system.null () in
732
      don't_wait_for
×
733
        (Pipe_lib.Strict_pipe.Reader.iter
×
734
           (Trust_system.upcall_pipe s)
×
735
           ~f:(const Deferred.unit) ) ;
×
736
      s
×
737

738
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
739

740
    let proof_level = precomputed_values.proof_level
741

742
    let constraint_constants = precomputed_values.constraint_constants
743

744
    let compile_config = Mina_compile_config.For_unit_tests.t
745

746
    module Context = struct
747
      let logger = logger
748

749
      let precomputed_values = precomputed_values
750

751
      let constraint_constants =
752
        Genesis_constants.For_unit_tests.Constraint_constants.t
753

754
      let consensus_constants = precomputed_values.consensus_constants
755

756
      let compile_config = compile_config
757
    end
758

759
    let verifier =
760
      Async.Thread_safe.block_on_async_exn (fun () ->
×
761
          Verifier.For_tests.default ~constraint_constants ~logger ~proof_level
×
762
            () )
763

764
    module Genesis_ledger = (val precomputed_values.genesis_ledger)
765

766
    let downcast_transition ~sender transition =
767
      let transition =
×
768
        transition |> Mina_block.Validated.remember
×
769
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
770
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
771
      in
772
      Envelope.Incoming.wrap ~data:transition
×
773
        ~sender:(Envelope.Sender.Remote sender)
774

775
    let downcast_breadcrumb ~sender breadcrumb =
776
      downcast_transition ~sender
×
777
        (Transition_frontier.Breadcrumb.validated_transition breadcrumb)
×
778

779
    let make_non_running_bootstrap ~genesis_root ~network =
780
      let transition =
×
781
        genesis_root
782
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
783
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
784
      in
785
      { context = (module Context)
×
786
      ; trust_system
787
      ; verifier
788
      ; best_seen_transition = transition
789
      ; current_root = transition
790
      ; network
791
      ; num_of_root_snarked_ledger_retargeted = 0
792
      }
793

794
    let%test_unit "Bootstrap controller caches all transitions it is passed \
795
                   through the transition_reader" =
796
      let branch_size = (max_frontier_length * 2) + 2 in
×
797
      Quickcheck.test ~trials:1
798
        (let open Quickcheck.Generator.Let_syntax in
799
        (* we only need one node for this test, but we need more than one peer so that mina_networking does not throw an error *)
800
        let%bind fake_network =
801
          Fake_network.Generator.(
802
            gen ~precomputed_values ~verifier ~max_frontier_length
×
803
              ~compile_config [ fresh_peer; fresh_peer ]
804
              ~use_super_catchup:false)
805
        in
806
        let%map make_branch =
807
          Transition_frontier.Breadcrumb.For_tests.gen_seq ~precomputed_values
×
808
            ~verifier
809
            ~accounts_with_secret_keys:(Lazy.force Genesis_ledger.accounts)
×
810
            branch_size
811
        in
812
        let [ me; _ ] = fake_network.peer_networks in
×
813
        let branch =
814
          Async.Thread_safe.block_on_async_exn (fun () ->
815
              make_branch (Transition_frontier.root me.state.frontier) )
×
816
        in
817
        (fake_network, branch))
×
818
        ~f:(fun (fake_network, branch) ->
819
          let [ me; other ] = fake_network.peer_networks in
×
820
          let genesis_root =
821
            Transition_frontier.(
822
              Breadcrumb.validated_transition @@ root me.state.frontier)
×
823
            |> Mina_block.Validated.remember
824
          in
825
          let transition_graph = Transition_cache.create () in
×
826
          let sync_ledger_reader, sync_ledger_writer =
×
827
            Pipe_lib.Strict_pipe.create ~name:"sync_ledger_reader" Synchronous
828
          in
829
          let bootstrap =
×
830
            make_non_running_bootstrap ~genesis_root ~network:me.network
831
          in
832
          let root_sync_ledger =
833
            Sync_ledger.Db.create
834
              (Transition_frontier.root_snarked_ledger me.state.frontier)
×
835
              ~logger ~trust_system
836
          in
837
          Async.Thread_safe.block_on_async_exn (fun () ->
×
838
              let sync_deferred =
×
839
                sync_ledger bootstrap ~root_sync_ledger ~transition_graph
840
                  ~preferred:[] ~sync_ledger_reader
841
                  ~genesis_constants:Genesis_constants.For_unit_tests.t
842
              in
843
              let%bind () =
844
                Deferred.List.iter branch ~f:(fun breadcrumb ->
×
845
                    Strict_pipe.Writer.write sync_ledger_writer
×
846
                      ( `Block
847
                          (downcast_breadcrumb ~sender:other.peer breadcrumb)
×
848
                      , `Valid_cb None ) )
849
              in
850
              Strict_pipe.Writer.close sync_ledger_writer ;
×
851
              sync_deferred ) ;
×
852
          let expected_transitions =
×
853
            List.map branch
854
              ~f:
855
                (Fn.compose
×
856
                   (With_hash.map ~f:Mina_block.header)
857
                   (Fn.compose Mina_block.Validation.block_with_hash
×
858
                      (Fn.compose Mina_block.Validated.remember
×
859
                         Transition_frontier.Breadcrumb.validated_transition ) ) )
860
          in
861
          let saved_transitions =
×
862
            Transition_cache.data transition_graph
×
863
            |> List.map ~f:(fun (x, _) ->
864
                   Transition_cache.header_with_hash @@ Envelope.Incoming.data x )
×
865
          in
866
          let module E = struct
×
867
            module T = struct
868
              type t = Mina_block.Header.t State_hash.With_state_hashes.t
×
869
              [@@deriving sexp]
870

871
              let compare = external_transition_compare ~context:(module Context)
872
            end
873

874
            include Comparable.Make (T)
875
          end in
876
          [%test_result: E.Set.t]
×
877
            (E.Set.of_list saved_transitions)
×
878
            ~expect:(E.Set.of_list expected_transitions) )
×
879

880
    let run_bootstrap ~timeout_duration ~my_net ~transition_reader =
881
      let open Fake_network in
×
882
      let time_controller = Block_time.Controller.basic ~logger in
883
      let persistent_root =
884
        Transition_frontier.persistent_root my_net.state.frontier
885
      in
886
      let persistent_frontier =
×
887
        Transition_frontier.persistent_frontier my_net.state.frontier
888
      in
889
      let initial_root_transition =
×
890
        Transition_frontier.(
891
          Breadcrumb.validated_transition (root my_net.state.frontier))
×
892
      in
893
      let%bind () =
894
        Transition_frontier.close ~loc:__LOC__ my_net.state.frontier
×
895
      in
896
      [%log info] "bootstrap begin" ;
×
897
      Block_time.Timeout.await_exn time_controller ~timeout_duration
×
898
        (run
899
           ~context:(module Context)
900
           ~trust_system ~verifier ~network:my_net.network ~preferred_peers:[]
901
           ~consensus_local_state:my_net.state.consensus_local_state
902
           ~transition_reader ~persistent_root ~persistent_frontier
903
           ~catchup_mode:`Normal ~initial_root_transition )
904

905
    let assert_transitions_increasingly_sorted ~root
906
        (incoming_transitions : Transition_cache.element list) =
907
      let root =
×
908
        Transition_frontier.Breadcrumb.block root |> Mina_block.header
×
909
      in
910
      ignore
×
911
        ( List.fold_result ~init:root incoming_transitions
×
912
            ~f:(fun max_acc incoming_transition ->
913
              let With_hash.{ data = header; _ } =
×
914
                Transition_cache.header_with_hash
915
                  (Envelope.Incoming.data @@ fst incoming_transition)
×
916
              in
917
              let header_len h =
×
918
                Mina_block.Header.protocol_state h
×
919
                |> Protocol_state.consensus_state
×
920
                |> Consensus.Data.Consensus_state.blockchain_length
921
              in
922
              let open Result.Let_syntax in
923
              let%map () =
924
                Result.ok_if_true
×
925
                  Mina_numbers.Length.(header_len max_acc <= header_len header)
×
926
                  ~error:
927
                    (Error.of_string
×
928
                       "The blocks are not sorted in increasing order" )
929
              in
930
              header )
×
931
          |> Or_error.ok_exn
×
932
          : Mina_block.Header.t )
933

934
    let%test_unit "sync with one node after receiving a transition" =
935
      Quickcheck.test ~trials:1
×
936
        Fake_network.Generator.(
937
          gen ~precomputed_values ~verifier ~max_frontier_length
×
938
            ~use_super_catchup:false ~compile_config
939
            [ fresh_peer
940
            ; peer_with_branch
941
                ~frontier_branch_size:((max_frontier_length * 2) + 2)
942
            ])
943
        ~f:(fun fake_network ->
944
          let [ my_net; peer_net ] = fake_network.peer_networks in
×
945
          let transition_reader, transition_writer =
946
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
947
              (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
948
          in
949
          let block =
×
950
            Envelope.Incoming.wrap
951
              ~data:
952
                ( Transition_frontier.best_tip peer_net.state.frontier
×
953
                |> Transition_frontier.Breadcrumb.validated_transition
×
954
                |> Mina_block.Validated.remember
×
955
                |> Mina_block.Validation.reset_frontier_dependencies_validation
×
956
                |> Mina_block.Validation.reset_staged_ledger_diff_validation )
×
957
              ~sender:(Envelope.Sender.Remote peer_net.peer)
958
          in
959
          Pipe_lib.Strict_pipe.Writer.write transition_writer
960
            (`Block block, `Valid_cb None) ;
961
          let new_frontier, sorted_external_transitions =
×
962
            Async.Thread_safe.block_on_async_exn (fun () ->
963
                run_bootstrap
×
964
                  ~timeout_duration:(Block_time.Span.of_ms 30_000L)
×
965
                  ~my_net ~transition_reader )
966
          in
967
          assert_transitions_increasingly_sorted
×
968
            ~root:(Transition_frontier.root new_frontier)
×
969
            sorted_external_transitions ;
970
          [%test_result: Ledger_hash.t]
×
971
            ( Ledger.Db.merkle_root
×
972
            @@ Transition_frontier.root_snarked_ledger new_frontier )
×
973
            ~expect:
974
              ( Ledger.Db.merkle_root
×
975
              @@ Transition_frontier.root_snarked_ledger peer_net.state.frontier
×
976
              ) )
977

978
    let%test_unit "reconstruct staged_ledgers using \
979
                   of_scan_state_and_snarked_ledger" =
980
      Quickcheck.test ~trials:1
×
981
        (Transition_frontier.For_tests.gen ~precomputed_values ~verifier
×
982
           ~max_length:max_frontier_length ~size:max_frontier_length () )
983
        ~f:(fun frontier ->
984
          Thread_safe.block_on_async_exn
×
985
          @@ fun () ->
986
          Deferred.List.iter (Transition_frontier.all_breadcrumbs frontier)
×
987
            ~f:(fun breadcrumb ->
988
              let staged_ledger =
×
989
                Transition_frontier.Breadcrumb.staged_ledger breadcrumb
990
              in
991
              let expected_merkle_root =
×
992
                Staged_ledger.ledger staged_ledger |> Ledger.merkle_root
×
993
              in
994
              let snarked_ledger =
×
995
                Transition_frontier.root_snarked_ledger frontier
×
996
                |> Ledger.of_database
997
              in
998
              let snarked_local_state =
×
999
                Transition_frontier.root frontier
×
1000
                |> Transition_frontier.Breadcrumb.protocol_state
×
1001
                |> Protocol_state.blockchain_state
×
1002
                |> Blockchain_state.snarked_local_state
1003
              in
1004
              let scan_state = Staged_ledger.scan_state staged_ledger in
×
1005
              let get_state hash =
×
1006
                match Transition_frontier.find_protocol_state frontier hash with
×
1007
                | Some protocol_state ->
×
1008
                    Ok protocol_state
1009
                | None ->
×
1010
                    Or_error.errorf
1011
                      !"Protocol state (for scan state transactions) for \
×
1012
                        %{sexp:State_hash.t} not found"
1013
                      hash
1014
              in
1015
              let pending_coinbases =
1016
                Staged_ledger.pending_coinbase_collection staged_ledger
1017
              in
1018
              let%map actual_staged_ledger =
1019
                Staged_ledger.of_scan_state_pending_coinbases_and_snarked_ledger
1020
                  ~scan_state ~logger ~verifier ~constraint_constants
1021
                  ~snarked_ledger ~snarked_local_state ~expected_merkle_root
1022
                  ~pending_coinbases ~get_state
1023
                |> Deferred.Or_error.ok_exn
×
1024
              in
1025
              assert (
×
1026
                Staged_ledger_hash.equal
×
1027
                  (Staged_ledger.hash staged_ledger)
×
1028
                  (Staged_ledger.hash actual_staged_ledger) ) ) )
×
1029

1030
    (*
1031
    let%test_unit "if we see a new transition that is better than the \
1032
                   transition that we are syncing from, than we should \
1033
                   retarget our root" =
1034
      Quickcheck.test ~trials:1
1035
        Fake_network.Generator.(
1036
          gen ~max_frontier_length
1037
            [ fresh_peer
1038
            ; peer_with_branch ~frontier_branch_size:max_frontier_length
1039
            ; peer_with_branch
1040
                ~frontier_branch_size:((max_frontier_length * 2) + 2) ])
1041
        ~f:(fun fake_network ->
1042
          let [me; weaker_chain; stronger_chain] =
1043
            fake_network.peer_networks
1044
          in
1045
          let transition_reader, transition_writer =
1046
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
1047
              (Buffered (`Capacity 10, `Overflow Drop_head))
1048
          in
1049
          Envelope.Incoming.wrap
1050
            ~data:
1051
              ( Transition_frontier.best_tip weaker_chain.state.frontier
1052
              |> Transition_frontier.Breadcrumb.validated_transition
1053
              |> Mina_block.Validated.to_initial_validated )
1054
            ~sender:
1055
              (Envelope.Sender.Remote
1056
                 (weaker_chain.peer.host, weaker_chain.peer.peer_id))
1057
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1058
          Envelope.Incoming.wrap
1059
            ~data:
1060
              ( Transition_frontier.best_tip stronger_chain.state.frontier
1061
              |> Transition_frontier.Breadcrumb.validated_transition
1062
              |> Mina_block.Validated.to_initial_validated )
1063
            ~sender:
1064
              (Envelope.Sender.Remote
1065
                 (stronger_chain.peer.host, stronger_chain.peer.peer_id))
1066
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1067
          let new_frontier, sorted_external_transitions =
1068
            Async.Thread_safe.block_on_async_exn (fun () ->
1069
                run_bootstrap
1070
                  ~timeout_duration:(Block_time.Span.of_ms 60_000L)
1071
                  ~my_net:me ~transition_reader )
1072
          in
1073
          assert_transitions_increasingly_sorted
1074
            ~root:(Transition_frontier.root new_frontier)
1075
            sorted_external_transitions ;
1076
          [%test_result: Ledger_hash.t]
1077
            ( Ledger.Db.merkle_root
1078
            @@ Transition_frontier.root_snarked_ledger new_frontier )
1079
            ~expect:
1080
              ( Ledger.Db.merkle_root
1081
              @@ Transition_frontier.root_snarked_ledger
1082
                   stronger_chain.state.frontier ) )
1083
*)
1084
  end )
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc