• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 3424

27 Feb 2025 08:50PM UTC coverage: 32.457% (-28.3%) from 60.796%
3424

push

buildkite

web-flow
Merge pull request #16392 from MinaProtocol/dkijania/do_not_publish_dockers_on_pr

Skip publishing dockers on PR

23218 of 71534 relevant lines covered (32.46%)

16977.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.54
/src/lib/bootstrap_controller/bootstrap_controller.ml
1
(* Only show stdout for failed inline tests. *)
4✔
2
open Core
3
open Async
4
open Mina_base
5
module Ledger = Mina_ledger.Ledger
6
module Sync_ledger = Mina_ledger.Sync_ledger
7
open Mina_state
8
open Pipe_lib.Strict_pipe
9
open Network_peer
10
module Transition_cache = Transition_cache
11

12
module type CONTEXT = sig
13
  val logger : Logger.t
14

15
  val precomputed_values : Precomputed_values.t
16

17
  val constraint_constants : Genesis_constants.Constraint_constants.t
18

19
  val consensus_constants : Consensus.Constants.t
20

21
  val ledger_sync_config : Syncable_ledger.daemon_config
22

23
  val proof_cache_db : Proof_cache_tag.cache_db
24
end
25

26
type Structured_log_events.t += Bootstrap_complete
27
  [@@deriving register_event { msg = "Bootstrap state: complete." }]
×
28

29
type t =
30
  { context : (module CONTEXT)
31
  ; trust_system : Trust_system.t
32
  ; verifier : Verifier.t
33
  ; mutable best_seen_transition : Mina_block.initial_valid_block
34
  ; mutable current_root : Mina_block.initial_valid_block
35
  ; network : Mina_networking.t
36
  ; mutable num_of_root_snarked_ledger_retargeted : int
37
  }
38

39
type time = Time.Span.t
40

41
let time_to_yojson span =
42
  `String (Printf.sprintf "%f seconds" (Time.Span.to_sec span))
×
43

44
type opt_time = time option
45

46
let opt_time_to_yojson = function
47
  | Some time ->
×
48
      time_to_yojson time
49
  | None ->
×
50
      `Null
51

52
(** An auxiliary data structure for collecting various metrics for boostrap controller. *)
53
type bootstrap_cycle_stats =
×
54
  { cycle_result : string
×
55
  ; sync_ledger_time : time
×
56
  ; staged_ledger_data_download_time : time
×
57
  ; staged_ledger_construction_time : opt_time
×
58
  ; local_state_sync_required : bool
×
59
  ; local_state_sync_time : opt_time
×
60
  }
61
[@@deriving to_yojson]
62

63
let time_deferred deferred =
64
  let start_time = Time.now () in
×
65
  let%map result = deferred in
66
  let end_time = Time.now () in
×
67
  (Time.diff end_time start_time, result)
×
68

69
let worth_getting_root ({ context = (module Context); _ } as t) candidate =
70
  let module Consensus_context = struct
×
71
    include Context
72

73
    let logger =
74
      Logger.extend logger
×
75
        [ ( "selection_context"
76
          , `String "Bootstrap_controller.worth_getting_root" )
77
        ]
78
  end in
79
  Consensus.Hooks.equal_select_status `Take
80
  @@ Consensus.Hooks.select
81
       ~context:(module Consensus_context)
82
       ~existing:
83
         ( t.best_seen_transition |> Mina_block.Validation.block_with_hash
×
84
         |> With_hash.map ~f:Mina_block.consensus_state )
×
85
       ~candidate
86

87
let received_bad_proof ({ context = (module Context); _ } as t) host e =
88
  let open Context in
×
89
  Trust_system.(
90
    record t.trust_system logger host
91
      Actions.
92
        ( Violated_protocol
93
        , Some
94
            ( "Bad ancestor proof: $error"
95
            , [ ("error", Error_json.error_to_yojson e) ] ) ))
×
96

97
let done_syncing_root root_sync_ledger =
98
  Option.is_some (Sync_ledger.Db.peek_valid_tree root_sync_ledger)
×
99

100
let should_sync ~root_sync_ledger t candidate_state =
101
  (not @@ done_syncing_root root_sync_ledger)
×
102
  && worth_getting_root t candidate_state
×
103

104
(** Update [Synced_ledger]'s target and [best_seen_transition] and [current_root] accordingly. *)
105
let start_sync_job_with_peer ~sender ~root_sync_ledger
106
    ({ context = (module Context); _ } as t) peer_best_tip peer_root =
107
  let open Context in
×
108
  let%bind () =
109
    Trust_system.(
110
      record t.trust_system logger sender
×
111
        Actions.
112
          ( Fulfilled_request
113
          , Some ("Received verified peer root and best tip", []) ))
114
  in
115
  t.best_seen_transition <- peer_best_tip ;
×
116
  t.current_root <- peer_root ;
117
  let blockchain_state =
118
    t.current_root |> Mina_block.Validation.block |> Mina_block.header
×
119
    |> Mina_block.Header.protocol_state |> Protocol_state.blockchain_state
×
120
  in
121
  let expected_staged_ledger_hash =
×
122
    blockchain_state |> Blockchain_state.staged_ledger_hash
123
  in
124
  let snarked_ledger_hash =
×
125
    blockchain_state |> Blockchain_state.snarked_ledger_hash
126
  in
127
  return
×
128
  @@
129
  match
130
    Sync_ledger.Db.new_goal root_sync_ledger
131
      (Frozen_ledger_hash.to_ledger_hash snarked_ledger_hash)
×
132
      ~data:
133
        ( State_hash.With_state_hashes.state_hash
×
134
          @@ Mina_block.Validation.block_with_hash t.current_root
×
135
        , sender
136
        , expected_staged_ledger_hash )
137
      ~equal:(fun (hash1, _, _) (hash2, _, _) -> State_hash.equal hash1 hash2)
×
138
  with
139
  | `New ->
×
140
      t.num_of_root_snarked_ledger_retargeted <-
141
        t.num_of_root_snarked_ledger_retargeted + 1 ;
142
      `Syncing_new_snarked_ledger
143
  | `Update_data ->
×
144
      `Updating_root_transition
145
  | `Repeat ->
×
146
      `Ignored
147

148
let to_consensus_state h =
149
  Mina_block.Header.protocol_state h |> Protocol_state.consensus_state
×
150

151
(** For each transition, this function would compare it with the existing one.
152
    If the incoming transition is better, then download the merkle list from
153
    that transition to its root and verify it. If we get a better root than
154
    the existing one, then reset the Sync_ledger's target by calling
155
    [start_sync_job_with_peer] function. *)
156
let on_transition ({ context = (module Context); _ } as t) ~sender
157
    ~root_sync_ledger candidate_header =
158
  let open Context in
×
159
  let candidate_consensus_state =
160
    With_hash.map ~f:to_consensus_state candidate_header
161
  in
162
  if not @@ should_sync ~root_sync_ledger t candidate_consensus_state then
×
163
    Deferred.return `Ignored
×
164
  else
165
    match%bind
166
      Mina_networking.get_ancestry t.network sender.Peer.peer_id
×
167
        (With_hash.map_hash candidate_consensus_state
×
168
           ~f:State_hash.State_hashes.state_hash )
169
    with
170
    | Error e ->
×
171
        [%log error]
×
172
          ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
173
          !"Could not get the proof of the root transition from the network: \
174
            $error" ;
175
        Deferred.return `Ignored
×
176
    | Ok peer_root_with_proof -> (
×
177
        match%bind
178
          Sync_handler.Root.verify
×
179
            ~context:(module Context)
180
            ~verifier:t.verifier candidate_consensus_state
181
            peer_root_with_proof.data
182
        with
183
        | Ok (`Root root, `Best_tip best_tip) ->
×
184
            if done_syncing_root root_sync_ledger then return `Ignored
×
185
            else
186
              start_sync_job_with_peer ~sender ~root_sync_ledger t best_tip root
×
187
        | Error e ->
×
188
            return (received_bad_proof t sender e |> Fn.const `Ignored) )
×
189

190
(** A helper function that wraps the calls to Sync_ledger and iterate through
191
    incoming transitions, add those to the transition_cache and calls
192
    [on_transition] function. *)
193
let sync_ledger ({ context = (module Context); _ } as t) ~preferred
194
    ~root_sync_ledger ~transition_graph ~sync_ledger_reader =
195
  let open Context in
×
196
  let query_reader = Sync_ledger.Db.query_reader root_sync_ledger in
197
  let response_writer = Sync_ledger.Db.answer_writer root_sync_ledger in
×
198
  Mina_networking.glue_sync_ledger ~preferred t.network query_reader
×
199
    response_writer ;
200
  Reader.iter sync_ledger_reader ~f:(fun (b_or_h, `Valid_cb vc) ->
×
201
      let header_with_hash, sender, transition_cache_element =
×
202
        match b_or_h with
203
        | `Block b_env ->
×
204
            ( Envelope.Incoming.data b_env
×
205
              |> Mina_block.Validation.block_with_hash
×
206
              |> With_hash.map ~f:Mina_block.header
×
207
            , Envelope.Incoming.remote_sender_exn b_env
×
208
            , Envelope.Incoming.map ~f:(fun x -> `Block x) b_env )
×
209
        | `Header h_env ->
×
210
            ( Envelope.Incoming.data h_env
×
211
              |> Mina_block.Validation.header_with_hash
×
212
            , Envelope.Incoming.remote_sender_exn h_env
×
213
            , Envelope.Incoming.map ~f:(fun x -> `Header x) h_env )
×
214
      in
215
      let previous_state_hash =
216
        With_hash.data header_with_hash
×
217
        |> Mina_block.Header.protocol_state
×
218
        |> Protocol_state.previous_state_hash
219
      in
220
      Transition_cache.add transition_graph ~parent:previous_state_hash
×
221
        (transition_cache_element, vc) ;
222
      (* TODO: Efficiently limiting the number of green threads in #1337 *)
223
      if
×
224
        worth_getting_root t
225
          (With_hash.map ~f:to_consensus_state header_with_hash)
×
226
      then (
×
227
        [%log trace] "Added the transition from sync_ledger_reader into cache"
×
228
          ~metadata:
229
            [ ( "state_hash"
230
              , State_hash.to_yojson
×
231
                  (State_hash.With_state_hashes.state_hash header_with_hash) )
×
232
            ; ( "header"
233
              , Mina_block.Header.to_yojson (With_hash.data header_with_hash) )
×
234
            ] ;
235

236
        Deferred.ignore_m
×
237
        @@ on_transition t ~sender ~root_sync_ledger header_with_hash )
×
238
      else Deferred.unit )
×
239

240
let external_transition_compare ~context:(module Context : CONTEXT) =
241
  let get_consensus_state =
×
242
    Fn.compose Protocol_state.consensus_state Mina_block.Header.protocol_state
243
  in
244
  Comparable.lift
×
245
    (fun existing candidate ->
246
      (* To prevent the logger to spam a lot of messsages, the logger input is set to null *)
247
      if
×
248
        State_hash.equal
249
          (State_hash.With_state_hashes.state_hash existing)
×
250
          (State_hash.With_state_hashes.state_hash candidate)
×
251
      then 0
×
252
      else if
×
253
        Consensus.Hooks.equal_select_status `Keep
254
        @@ Consensus.Hooks.select ~context:(module Context) ~existing ~candidate
255
      then -1
×
256
      else 1 )
×
257
    ~f:(With_hash.map ~f:get_consensus_state)
258

259
(** The entry point function for bootstrap controller. When bootstrap finished
260
    it would return a transition frontier with the root breadcrumb and a list
261
    of transitions collected during bootstrap.
262

263
    Bootstrap controller would do the following steps to contrust the
264
    transition frontier:
265
    1. Download the root snarked_ledger.
266
    2. Download the scan state and pending coinbases.
267
    3. Construct the staged ledger from the snarked ledger, scan state and
268
       pending coinbases.
269
    4. Synchronize the consensus local state if necessary.
270
    5. Close the old frontier and reload a new one from disk.
271
 *)
272
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
273
    ~consensus_local_state ~transition_reader ~preferred_peers ~persistent_root
274
    ~persistent_frontier ~initial_root_transition ~catchup_mode =
275
  let open Context in
×
276
  O1trace.thread "bootstrap" (fun () ->
277
      let rec loop previous_cycles =
×
278
        let sync_ledger_pipe = "sync ledger pipe" in
×
279
        let sync_ledger_reader, sync_ledger_writer =
280
          create ~name:sync_ledger_pipe
281
            (Buffered
282
               ( `Capacity 50
283
               , `Overflow
284
                   (Drop_head
285
                      (fun (b_or_h, `Valid_cb valid_cb) ->
286
                        let hash =
×
287
                          match b_or_h with
288
                          | `Block b_env ->
×
289
                              Envelope.Incoming.data b_env
×
290
                              |> Mina_block.Validation.block_with_hash
×
291
                              |> With_hash.hash
×
292
                          | `Header h_env ->
×
293
                              Envelope.Incoming.data h_env
×
294
                              |> Mina_block.Validation.header_with_hash
×
295
                              |> With_hash.hash
×
296
                        in
297
                        Mina_metrics.(
298
                          Counter.inc_one
×
299
                            Pipe.Drop_on_overflow.bootstrap_sync_ledger) ;
300
                        Mina_block.handle_dropped_transition ?valid_cb hash
301
                          ~pipe_name:sync_ledger_pipe ~logger ) ) ) )
302
        in
303
        don't_wait_for
×
304
          (transfer_while_writer_alive transition_reader sync_ledger_writer
×
305
             ~f:Fn.id ) ;
306
        let initial_root_transition =
×
307
          initial_root_transition |> Mina_block.Validated.remember
×
308
          |> Mina_block.Validation.reset_frontier_dependencies_validation
×
309
          |> Mina_block.Validation.reset_staged_ledger_diff_validation
310
        in
311
        let t =
×
312
          { network
313
          ; context = (module Context)
314
          ; trust_system
315
          ; verifier
316
          ; best_seen_transition = initial_root_transition
317
          ; current_root = initial_root_transition
318
          ; num_of_root_snarked_ledger_retargeted = 0
319
          }
320
        in
321
        let transition_graph = Transition_cache.create () in
322
        let temp_persistent_root_instance =
×
323
          Transition_frontier.Persistent_root.create_instance_exn
324
            persistent_root
325
        in
326
        let temp_snarked_ledger =
×
327
          Transition_frontier.Persistent_root.Instance.snarked_ledger
328
            temp_persistent_root_instance
329
        in
330
        (* step 1. download snarked_ledger *)
331
        let%bind sync_ledger_time, (hash, sender, expected_staged_ledger_hash) =
332
          time_deferred
×
333
            (let root_sync_ledger =
334
               Sync_ledger.Db.create temp_snarked_ledger
335
                 ~context:(module Context)
336
                 ~trust_system
337
             in
338
             don't_wait_for
×
339
               (sync_ledger t ~preferred:preferred_peers ~root_sync_ledger
×
340
                  ~transition_graph ~sync_ledger_reader ) ;
341
             (* We ignore the resulting ledger returned here since it will always
342
                * be the same as the ledger we started with because we are syncing
343
                * a db ledger. *)
344
             let%map _, data = Sync_ledger.Db.valid_tree root_sync_ledger in
×
345
             Sync_ledger.Db.destroy root_sync_ledger ;
×
346
             data )
×
347
        in
348
        Mina_metrics.(
×
349
          Counter.inc Bootstrap.root_snarked_ledger_sync_ms
×
350
            Time.Span.(to_ms sync_ledger_time)) ;
×
351
        Mina_metrics.(
352
          Gauge.set Bootstrap.num_of_root_snarked_ledger_retargeted
×
353
            (Float.of_int t.num_of_root_snarked_ledger_retargeted)) ;
×
354
        (* step 2. Download scan state and pending coinbases. *)
355
        let%bind ( staged_ledger_data_download_time
356
                 , staged_ledger_construction_time
357
                 , staged_ledger_aux_result ) =
358
          let%bind ( staged_ledger_data_download_time
359
                   , staged_ledger_data_download_result ) =
360
            time_deferred
×
361
              (Mina_networking
362
               .get_staged_ledger_aux_and_pending_coinbases_at_hash t.network
×
363
                 sender.peer_id hash )
364
          in
365
          match staged_ledger_data_download_result with
×
366
          | Error err ->
×
367
              Deferred.return (staged_ledger_data_download_time, None, Error err)
368
          | Ok
×
369
              ( scan_state_uncached
370
              , expected_merkle_root
371
              , pending_coinbases
372
              , protocol_states ) -> (
373
              let%map staged_ledger_construction_result =
374
                O1trace.thread "construct_root_staged_ledger" (fun () ->
×
375
                    let open Deferred.Or_error.Let_syntax in
×
376
                    let received_staged_ledger_hash =
377
                      Staged_ledger_hash.of_aux_ledger_and_coinbase_hash
378
                        (Staged_ledger.Scan_state.Stable.Latest.hash
×
379
                           scan_state_uncached )
380
                        expected_merkle_root pending_coinbases
381
                    in
382
                    [%log debug]
×
383
                      ~metadata:
384
                        [ ( "expected_staged_ledger_hash"
385
                          , Staged_ledger_hash.to_yojson
×
386
                              expected_staged_ledger_hash )
387
                        ; ( "received_staged_ledger_hash"
388
                          , Staged_ledger_hash.to_yojson
×
389
                              received_staged_ledger_hash )
390
                        ]
391
                      "Comparing $expected_staged_ledger_hash to \
392
                       $received_staged_ledger_hash" ;
393
                    let%bind new_root =
394
                      t.current_root
395
                      |> Mina_block.Validation
396
                         .skip_frontier_dependencies_validation
×
397
                           `This_block_belongs_to_a_detached_subtree
398
                      |> Mina_block.Validation.validate_staged_ledger_hash
×
399
                           (`Staged_ledger_already_materialized
400
                             received_staged_ledger_hash )
401
                      |> Result.map_error ~f:(fun _ ->
×
402
                             Error.of_string
×
403
                               "received faulty scan state from peer" )
404
                      |> Deferred.return
×
405
                    in
406
                    let protocol_states =
×
407
                      List.map protocol_states
408
                        ~f:(With_hash.of_data ~hash_data:Protocol_state.hashes)
409
                    in
410
                    let scan_state =
×
411
                      Staged_ledger.Scan_state.write_all_proofs_to_disk
412
                        ~proof_cache_db scan_state_uncached
413
                    in
414
                    let%bind protocol_states =
415
                      Staged_ledger.Scan_state.check_required_protocol_states
×
416
                        scan_state ~protocol_states
417
                      |> Deferred.return
×
418
                    in
419
                    let protocol_states_map =
×
420
                      protocol_states
421
                      |> List.map ~f:(fun ps ->
×
422
                             (State_hash.With_state_hashes.state_hash ps, ps) )
×
423
                      |> State_hash.Map.of_alist_exn
424
                    in
425
                    let get_state hash =
×
426
                      match Map.find protocol_states_map hash with
×
427
                      | None ->
×
428
                          let new_state_hash =
429
                            State_hash.With_state_hashes.state_hash
430
                              (fst new_root)
×
431
                          in
432
                          [%log error]
×
433
                            ~metadata:
434
                              [ ("new_root", State_hash.to_yojson new_state_hash)
×
435
                              ; ("state_hash", State_hash.to_yojson hash)
×
436
                              ]
437
                            "Protocol state (for scan state transactions) for \
438
                             $state_hash not found when bootstrapping to the \
439
                             new root $new_root" ;
440
                          Or_error.errorf
×
441
                            !"Protocol state (for scan state transactions) for \
×
442
                              %{sexp:State_hash.t} not found when \
443
                              bootstrapping to the new root \
444
                              %{sexp:State_hash.t}"
445
                            hash new_state_hash
446
                      | Some protocol_state ->
×
447
                          Ok (With_hash.data protocol_state)
×
448
                    in
449
                    (* step 3. Construct staged ledger from snarked ledger, scan state
450
                       and pending coinbases. *)
451
                    (* Construct the staged ledger before constructing the transition
452
                     * frontier in order to verify the scan state we received.
453
                     * TODO: reorganize the code to avoid doing this twice (#3480) *)
454
                    let open Deferred.Let_syntax in
455
                    let%map staged_ledger_construction_time, construction_result
456
                        =
457
                      time_deferred
×
458
                        (let open Deferred.Let_syntax in
459
                        let temp_mask =
460
                          Ledger.of_database temp_snarked_ledger
461
                        in
462
                        let%map result =
463
                          Staged_ledger
464
                          .of_scan_state_pending_coinbases_and_snarked_ledger
465
                            ~logger
466
                            ~snarked_local_state:
467
                              Mina_block.(
468
                                t.current_root |> Validation.block |> header
×
469
                                |> Header.protocol_state
×
470
                                |> Protocol_state.blockchain_state
×
471
                                |> Blockchain_state.snarked_local_state)
×
472
                            ~verifier ~constraint_constants ~scan_state
473
                            ~snarked_ledger:temp_mask ~expected_merkle_root
474
                            ~pending_coinbases ~get_state
475
                        in
476
                        ignore
×
477
                          ( Ledger.Maskable.unregister_mask_exn ~loc:__LOC__
×
478
                              temp_mask
479
                            : Ledger.unattached_mask ) ;
480
                        Result.map result
481
                          ~f:
482
                            (const
×
483
                               ( scan_state
484
                               , pending_coinbases
485
                               , new_root
486
                               , protocol_states ) ))
487
                    in
488
                    Ok (staged_ledger_construction_time, construction_result) )
×
489
              in
490
              match staged_ledger_construction_result with
×
491
              | Error err ->
×
492
                  (staged_ledger_data_download_time, None, Error err)
493
              | Ok (staged_ledger_construction_time, result) ->
×
494
                  ( staged_ledger_data_download_time
495
                  , Some staged_ledger_construction_time
496
                  , result ) )
497
        in
498
        Transition_frontier.Persistent_root.Instance.close
×
499
          temp_persistent_root_instance ;
500
        match staged_ledger_aux_result with
×
501
        | Error e ->
×
502
            let%bind () =
503
              Trust_system.(
504
                record t.trust_system logger sender
×
505
                  Actions.
506
                    ( Outgoing_connection_error
507
                    , Some
508
                        ( "Can't find scan state from the peer or received \
509
                           faulty scan state from the peer."
510
                        , [] ) ))
511
            in
512
            [%log error]
×
513
              ~metadata:
514
                [ ("error", Error_json.error_to_yojson e)
×
515
                ; ("state_hash", State_hash.to_yojson hash)
×
516
                ; ( "expected_staged_ledger_hash"
517
                  , Staged_ledger_hash.to_yojson expected_staged_ledger_hash )
×
518
                ]
519
              "Failed to find scan state for the transition with hash \
520
               $state_hash from the peer or received faulty scan state: \
521
               $error. Retry bootstrap" ;
522
            Writer.close sync_ledger_writer ;
×
523
            let this_cycle =
×
524
              { cycle_result = "failed to download and construct scan state"
525
              ; sync_ledger_time
526
              ; staged_ledger_data_download_time
527
              ; staged_ledger_construction_time
528
              ; local_state_sync_required = false
529
              ; local_state_sync_time = None
530
              }
531
            in
532
            loop (this_cycle :: previous_cycles)
533
        | Ok (scan_state, pending_coinbase, new_root, protocol_states) -> (
×
534
            let%bind () =
535
              Trust_system.(
536
                record t.trust_system logger sender
×
537
                  Actions.
538
                    ( Fulfilled_request
539
                    , Some ("Received valid scan state from peer", []) ))
540
            in
541
            let best_seen_block_with_hash, _ = t.best_seen_transition in
×
542
            let consensus_state =
543
              With_hash.data best_seen_block_with_hash
×
544
              |> Mina_block.header |> Mina_block.Header.protocol_state
×
545
              |> Protocol_state.consensus_state
546
            in
547
            (* step 4. Synchronize consensus local state if necessary *)
548
            let%bind ( local_state_sync_time
549
                     , (local_state_sync_required, local_state_sync_result) ) =
550
              time_deferred
×
551
                ( match
552
                    Consensus.Hooks.required_local_state_sync
553
                      ~constants:precomputed_values.consensus_constants
554
                      ~consensus_state ~local_state:consensus_local_state
555
                  with
556
                | None ->
×
557
                    [%log debug]
×
558
                      ~metadata:
559
                        [ ( "local_state"
560
                          , Consensus.Data.Local_state.to_yojson
×
561
                              consensus_local_state )
562
                        ; ( "consensus_state"
563
                          , Consensus.Data.Consensus_state.Value.to_yojson
×
564
                              consensus_state )
565
                        ]
566
                      "Not synchronizing consensus local state" ;
567
                    Deferred.return (false, Or_error.return ())
×
568
                | Some sync_jobs ->
×
569
                    [%log info] "Synchronizing consensus local state" ;
×
570
                    let%map result =
571
                      Consensus.Hooks.sync_local_state
×
572
                        ~context:(module Context)
573
                        ~local_state:consensus_local_state ~trust_system
574
                        ~glue_sync_ledger:
575
                          (Mina_networking.glue_sync_ledger t.network)
×
576
                        sync_jobs
577
                    in
578
                    (true, result) )
×
579
            in
580
            match local_state_sync_result with
×
581
            | Error e ->
×
582
                [%log error]
×
583
                  ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
584
                  "Local state sync failed: $error. Retry bootstrap" ;
585
                Writer.close sync_ledger_writer ;
×
586
                let this_cycle =
×
587
                  { cycle_result = "failed to synchronize local state"
588
                  ; sync_ledger_time
589
                  ; staged_ledger_data_download_time
590
                  ; staged_ledger_construction_time
591
                  ; local_state_sync_required
592
                  ; local_state_sync_time = Some local_state_sync_time
593
                  }
594
                in
595
                loop (this_cycle :: previous_cycles)
596
            | Ok () ->
×
597
                (* step 5. Close the old frontier and reload a new one from disk. *)
598
                let new_root_data : Transition_frontier.Root_data.Limited.t =
599
                  Transition_frontier.Root_data.Limited.create
600
                    ~transition:(Mina_block.Validated.lift new_root)
×
601
                    ~scan_state ~pending_coinbase ~protocol_states
602
                in
603
                let%bind () =
604
                  Transition_frontier.Persistent_frontier.reset_database_exn
×
605
                    persistent_frontier ~root_data:new_root_data
606
                    ~genesis_state_hash:
607
                      (State_hash.With_state_hashes.state_hash
×
608
                         precomputed_values.protocol_state_with_hashes )
609
                in
610
                (* TODO: lazy load db in persistent root to avoid unecessary opens like this *)
611
                Transition_frontier.Persistent_root.(
×
612
                  with_instance_exn persistent_root ~f:(fun instance ->
×
613
                      Instance.set_root_state_hash instance
×
614
                      @@ Mina_block.Validated.state_hash
×
615
                      @@ Mina_block.Validated.lift new_root )) ;
×
616
                let%map new_frontier =
617
                  let fail msg =
618
                    failwith
×
619
                      ( "failed to initialize transition frontier after \
620
                         bootstrapping: " ^ msg )
621
                  in
622
                  Transition_frontier.load
×
623
                    ~context:(module Context)
624
                    ~retry_with_fresh_db:false ~verifier ~consensus_local_state
625
                    ~persistent_root ~persistent_frontier ~catchup_mode ()
626
                  >>| function
×
627
                  | Ok frontier ->
×
628
                      frontier
629
                  | Error (`Failure msg) ->
×
630
                      fail msg
631
                  | Error `Bootstrap_required ->
×
632
                      fail
633
                        "bootstrap still required (indicates logical error in \
634
                         code)"
635
                  | Error `Persistent_frontier_malformed ->
×
636
                      fail "persistent frontier was malformed"
637
                  | Error `Snarked_ledger_mismatch ->
×
638
                      fail
639
                        "this should not happen, because we just reset the \
640
                         snarked_ledger"
641
                in
642
                [%str_log info] Bootstrap_complete ;
×
643
                let collected_transitions =
×
644
                  Transition_cache.data transition_graph
645
                in
646
                let logger =
×
647
                  Logger.extend logger
648
                    [ ( "context"
649
                      , `String "Filter collected transitions in bootstrap" )
650
                    ]
651
                in
652
                let root_consensus_state =
×
653
                  Transition_frontier.(
654
                    Breadcrumb.consensus_state_with_hashes (root new_frontier))
×
655
                in
656
                let filtered_collected_transitions =
657
                  List.filter collected_transitions
658
                    ~f:(fun (incoming_transition, _) ->
659
                      let transition =
×
660
                        Envelope.Incoming.data incoming_transition
×
661
                        |> Transition_cache.header_with_hash
662
                      in
663
                      Consensus.Hooks.equal_select_status `Take
×
664
                      @@ Consensus.Hooks.select
665
                           ~context:(module Context)
666
                           ~existing:root_consensus_state
667
                           ~candidate:
668
                             (With_hash.map
×
669
                                ~f:
670
                                  (Fn.compose Protocol_state.consensus_state
×
671
                                     Mina_block.Header.protocol_state )
672
                                transition ) )
673
                in
674
                [%log debug] "Sorting filtered transitions by consensus state"
×
675
                  ~metadata:[] ;
676
                let sorted_filtered_collected_transitions =
×
677
                  O1trace.sync_thread "sorting_collected_transitions" (fun () ->
678
                      List.sort filtered_collected_transitions
×
679
                        ~compare:
680
                          (Comparable.lift
×
681
                             ~f:(fun (x, _) ->
682
                               Transition_cache.header_with_hash
×
683
                               @@ Envelope.Incoming.data x )
×
684
                             (external_transition_compare
685
                                ~context:(module Context) ) ) )
686
                in
687
                let this_cycle =
×
688
                  { cycle_result = "success"
689
                  ; sync_ledger_time
690
                  ; staged_ledger_data_download_time
691
                  ; staged_ledger_construction_time
692
                  ; local_state_sync_required
693
                  ; local_state_sync_time = Some local_state_sync_time
694
                  }
695
                in
696
                ( this_cycle :: previous_cycles
697
                , (new_frontier, sorted_filtered_collected_transitions) ) )
698
      in
699
      let%map time_elapsed, (cycles, result) = time_deferred (loop []) in
×
700
      [%log info] "Bootstrap completed in $time_elapsed: $bootstrap_stats"
×
701
        ~metadata:
702
          [ ("time_elapsed", time_to_yojson time_elapsed)
×
703
          ; ( "bootstrap_stats"
704
            , `List (List.map ~f:bootstrap_cycle_stats_to_yojson cycles) )
×
705
          ] ;
706
      Mina_metrics.(
×
707
        Gauge.set Bootstrap.bootstrap_time_ms
×
708
          Core.Time.(Span.to_ms @@ time_elapsed)) ;
×
709
      result )
710

711
let%test_module "Bootstrap_controller tests" =
712
  ( module struct
713
    open Pipe_lib
714

715
    let max_frontier_length =
716
      Transition_frontier.global_max_length Genesis_constants.For_unit_tests.t
×
717

718
    let logger = Logger.null ()
×
719

720
    let () =
721
      (* Disable log messages from best_tip_diff logger. *)
722
      Logger.Consumer_registry.register ~commit_id:""
×
723
        ~id:Logger.Logger_id.best_tip_diff ~processor:(Logger.Processor.raw ())
×
724
        ~transport:
725
          (Logger.Transport.create
×
726
             ( module struct
727
               type t = unit
728

729
               let transport () _ = ()
×
730
             end )
731
             () )
732
        ()
733

734
    let trust_system =
735
      let s = Trust_system.null () in
736
      don't_wait_for
×
737
        (Pipe_lib.Strict_pipe.Reader.iter
×
738
           (Trust_system.upcall_pipe s)
×
739
           ~f:(const Deferred.unit) ) ;
×
740
      s
×
741

742
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
743

744
    let proof_level = precomputed_values.proof_level
745

746
    let constraint_constants = precomputed_values.constraint_constants
747

748
    let ledger_sync_config =
749
      Syncable_ledger.create_config
×
750
        ~compile_config:Mina_compile_config.For_unit_tests.t
751
        ~max_subtree_depth:None ~default_subtree_depth:None ()
752

753
    module Context = struct
754
      let logger = logger
755

756
      let precomputed_values = precomputed_values
757

758
      let constraint_constants =
759
        Genesis_constants.For_unit_tests.Constraint_constants.t
760

761
      let consensus_constants = precomputed_values.consensus_constants
762

763
      let ledger_sync_config =
764
        Syncable_ledger.create_config
×
765
          ~compile_config:Mina_compile_config.For_unit_tests.t
766
          ~max_subtree_depth:None ~default_subtree_depth:None ()
767

768
      let proof_cache_db = Proof_cache_tag.For_tests.create_db ()
×
769
    end
770

771
    let verifier =
772
      Async.Thread_safe.block_on_async_exn (fun () ->
×
773
          Verifier.For_tests.default ~constraint_constants ~logger ~proof_level
×
774
            () )
775

776
    module Genesis_ledger = (val precomputed_values.genesis_ledger)
777

778
    let downcast_transition ~sender transition =
779
      let transition =
×
780
        transition |> Mina_block.Validated.remember
×
781
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
782
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
783
      in
784
      Envelope.Incoming.wrap ~data:transition
×
785
        ~sender:(Envelope.Sender.Remote sender)
786

787
    let downcast_breadcrumb ~sender breadcrumb =
788
      downcast_transition ~sender
×
789
        (Transition_frontier.Breadcrumb.validated_transition breadcrumb)
×
790

791
    let make_non_running_bootstrap ~genesis_root ~network =
792
      let transition =
×
793
        genesis_root
794
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
795
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
796
      in
797
      { context = (module Context)
×
798
      ; trust_system
799
      ; verifier
800
      ; best_seen_transition = transition
801
      ; current_root = transition
802
      ; network
803
      ; num_of_root_snarked_ledger_retargeted = 0
804
      }
805

806
    let%test_unit "Bootstrap controller caches all transitions it is passed \
807
                   through the transition_reader" =
808
      let branch_size = (max_frontier_length * 2) + 2 in
×
809
      Quickcheck.test ~trials:1
810
        (let open Quickcheck.Generator.Let_syntax in
811
        (* we only need one node for this test, but we need more than one peer so that mina_networking does not throw an error *)
812
        let%bind fake_network =
813
          Fake_network.Generator.(
814
            gen ~precomputed_values ~verifier ~max_frontier_length
×
815
              ~ledger_sync_config [ fresh_peer; fresh_peer ]
816
              ~use_super_catchup:false)
817
        in
818
        let%map make_branch =
819
          Transition_frontier.Breadcrumb.For_tests.gen_seq ~precomputed_values
×
820
            ~verifier
821
            ~accounts_with_secret_keys:(Lazy.force Genesis_ledger.accounts)
×
822
            branch_size
823
        in
824
        let [ me; _ ] = fake_network.peer_networks in
×
825
        let branch =
826
          Async.Thread_safe.block_on_async_exn (fun () ->
827
              make_branch (Transition_frontier.root me.state.frontier) )
×
828
        in
829
        (fake_network, branch))
×
830
        ~f:(fun (fake_network, branch) ->
831
          let [ me; other ] = fake_network.peer_networks in
×
832
          let genesis_root =
833
            Transition_frontier.(
834
              Breadcrumb.validated_transition @@ root me.state.frontier)
×
835
            |> Mina_block.Validated.remember
836
          in
837
          let transition_graph = Transition_cache.create () in
×
838
          let sync_ledger_reader, sync_ledger_writer =
×
839
            Pipe_lib.Strict_pipe.create ~name:"sync_ledger_reader" Synchronous
840
          in
841
          let bootstrap =
×
842
            make_non_running_bootstrap ~genesis_root ~network:me.network
843
          in
844
          let root_sync_ledger =
845
            Sync_ledger.Db.create
846
              (Transition_frontier.root_snarked_ledger me.state.frontier)
×
847
              ~context:(module Context)
848
              ~trust_system
849
          in
850
          Async.Thread_safe.block_on_async_exn (fun () ->
×
851
              let sync_deferred =
×
852
                sync_ledger bootstrap ~root_sync_ledger ~transition_graph
853
                  ~preferred:[] ~sync_ledger_reader
854
              in
855
              let%bind () =
856
                Deferred.List.iter branch ~f:(fun breadcrumb ->
×
857
                    Strict_pipe.Writer.write sync_ledger_writer
×
858
                      ( `Block
859
                          (downcast_breadcrumb ~sender:other.peer breadcrumb)
×
860
                      , `Valid_cb None ) )
861
              in
862
              Strict_pipe.Writer.close sync_ledger_writer ;
×
863
              sync_deferred ) ;
×
864
          let expected_transitions =
×
865
            List.map branch
866
              ~f:
867
                (Fn.compose
×
868
                   (With_hash.map ~f:Mina_block.header)
869
                   (Fn.compose Mina_block.Validation.block_with_hash
×
870
                      (Fn.compose Mina_block.Validated.remember
×
871
                         Transition_frontier.Breadcrumb.validated_transition ) ) )
872
          in
873
          let saved_transitions =
×
874
            Transition_cache.data transition_graph
×
875
            |> List.map ~f:(fun (x, _) ->
876
                   Transition_cache.header_with_hash @@ Envelope.Incoming.data x )
×
877
          in
878
          let module E = struct
×
879
            module T = struct
880
              type t = Mina_block.Header.t State_hash.With_state_hashes.t
×
881
              [@@deriving sexp]
882

883
              let compare = external_transition_compare ~context:(module Context)
884
            end
885

886
            include Comparable.Make (T)
887
          end in
888
          [%test_result: E.Set.t]
×
889
            (E.Set.of_list saved_transitions)
×
890
            ~expect:(E.Set.of_list expected_transitions) )
×
891

892
    let run_bootstrap ~timeout_duration ~my_net ~transition_reader =
893
      let open Fake_network in
×
894
      let time_controller = Block_time.Controller.basic ~logger in
895
      let persistent_root =
896
        Transition_frontier.persistent_root my_net.state.frontier
897
      in
898
      let persistent_frontier =
×
899
        Transition_frontier.persistent_frontier my_net.state.frontier
900
      in
901
      let initial_root_transition =
×
902
        Transition_frontier.(
903
          Breadcrumb.validated_transition (root my_net.state.frontier))
×
904
      in
905
      let%bind () =
906
        Transition_frontier.close ~loc:__LOC__ my_net.state.frontier
×
907
      in
908
      [%log info] "bootstrap begin" ;
×
909
      Block_time.Timeout.await_exn time_controller ~timeout_duration
×
910
        (run
911
           ~context:(module Context)
912
           ~trust_system ~verifier ~network:my_net.network ~preferred_peers:[]
913
           ~consensus_local_state:my_net.state.consensus_local_state
914
           ~transition_reader ~persistent_root ~persistent_frontier
915
           ~catchup_mode:`Normal ~initial_root_transition )
916

917
    let assert_transitions_increasingly_sorted ~root
918
        (incoming_transitions : Transition_cache.element list) =
919
      let root =
×
920
        Transition_frontier.Breadcrumb.block root |> Mina_block.header
×
921
      in
922
      ignore
×
923
        ( List.fold_result ~init:root incoming_transitions
×
924
            ~f:(fun max_acc incoming_transition ->
925
              let With_hash.{ data = header; _ } =
×
926
                Transition_cache.header_with_hash
927
                  (Envelope.Incoming.data @@ fst incoming_transition)
×
928
              in
929
              let header_len h =
×
930
                Mina_block.Header.protocol_state h
×
931
                |> Protocol_state.consensus_state
×
932
                |> Consensus.Data.Consensus_state.blockchain_length
933
              in
934
              let open Result.Let_syntax in
935
              let%map () =
936
                Result.ok_if_true
×
937
                  Mina_numbers.Length.(header_len max_acc <= header_len header)
×
938
                  ~error:
939
                    (Error.of_string
×
940
                       "The blocks are not sorted in increasing order" )
941
              in
942
              header )
×
943
          |> Or_error.ok_exn
×
944
          : Mina_block.Header.t )
945

946
    let%test_unit "sync with one node after receiving a transition" =
947
      Quickcheck.test ~trials:1
×
948
        Fake_network.Generator.(
949
          gen ~precomputed_values ~verifier ~max_frontier_length
×
950
            ~use_super_catchup:false ~ledger_sync_config
951
            [ fresh_peer
952
            ; peer_with_branch
953
                ~frontier_branch_size:((max_frontier_length * 2) + 2)
954
            ])
955
        ~f:(fun fake_network ->
956
          let [ my_net; peer_net ] = fake_network.peer_networks in
×
957
          let transition_reader, transition_writer =
958
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
959
              (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
960
          in
961
          let block =
×
962
            Envelope.Incoming.wrap
963
              ~data:
964
                ( Transition_frontier.best_tip peer_net.state.frontier
×
965
                |> Transition_frontier.Breadcrumb.validated_transition
×
966
                |> Mina_block.Validated.remember
×
967
                |> Mina_block.Validation.reset_frontier_dependencies_validation
×
968
                |> Mina_block.Validation.reset_staged_ledger_diff_validation )
×
969
              ~sender:(Envelope.Sender.Remote peer_net.peer)
970
          in
971
          Pipe_lib.Strict_pipe.Writer.write transition_writer
972
            (`Block block, `Valid_cb None) ;
973
          let new_frontier, sorted_external_transitions =
×
974
            Async.Thread_safe.block_on_async_exn (fun () ->
975
                run_bootstrap
×
976
                  ~timeout_duration:(Block_time.Span.of_ms 30_000L)
×
977
                  ~my_net ~transition_reader )
978
          in
979
          assert_transitions_increasingly_sorted
×
980
            ~root:(Transition_frontier.root new_frontier)
×
981
            sorted_external_transitions ;
982
          [%test_result: Ledger_hash.t]
×
983
            ( Ledger.Db.merkle_root
×
984
            @@ Transition_frontier.root_snarked_ledger new_frontier )
×
985
            ~expect:
986
              ( Ledger.Db.merkle_root
×
987
              @@ Transition_frontier.root_snarked_ledger peer_net.state.frontier
×
988
              ) )
989

990
    let%test_unit "reconstruct staged_ledgers using \
991
                   of_scan_state_and_snarked_ledger" =
992
      Quickcheck.test ~trials:1
×
993
        (Transition_frontier.For_tests.gen ~precomputed_values ~verifier
×
994
           ~max_length:max_frontier_length ~size:max_frontier_length () )
995
        ~f:(fun frontier ->
996
          Thread_safe.block_on_async_exn
×
997
          @@ fun () ->
998
          Deferred.List.iter (Transition_frontier.all_breadcrumbs frontier)
×
999
            ~f:(fun breadcrumb ->
1000
              let staged_ledger =
×
1001
                Transition_frontier.Breadcrumb.staged_ledger breadcrumb
1002
              in
1003
              let expected_merkle_root =
×
1004
                Staged_ledger.ledger staged_ledger |> Ledger.merkle_root
×
1005
              in
1006
              let snarked_ledger =
×
1007
                Transition_frontier.root_snarked_ledger frontier
×
1008
                |> Ledger.of_database
1009
              in
1010
              let snarked_local_state =
×
1011
                Transition_frontier.root frontier
×
1012
                |> Transition_frontier.Breadcrumb.protocol_state
×
1013
                |> Protocol_state.blockchain_state
×
1014
                |> Blockchain_state.snarked_local_state
1015
              in
1016
              let scan_state = Staged_ledger.scan_state staged_ledger in
×
1017
              let get_state hash =
×
1018
                match Transition_frontier.find_protocol_state frontier hash with
×
1019
                | Some protocol_state ->
×
1020
                    Ok protocol_state
1021
                | None ->
×
1022
                    Or_error.errorf
1023
                      !"Protocol state (for scan state transactions) for \
×
1024
                        %{sexp:State_hash.t} not found"
1025
                      hash
1026
              in
1027
              let pending_coinbases =
1028
                Staged_ledger.pending_coinbase_collection staged_ledger
1029
              in
1030
              let%map actual_staged_ledger =
1031
                Staged_ledger.of_scan_state_pending_coinbases_and_snarked_ledger
1032
                  ~scan_state ~logger ~verifier ~constraint_constants
1033
                  ~snarked_ledger ~snarked_local_state ~expected_merkle_root
1034
                  ~pending_coinbases ~get_state
1035
                |> Deferred.Or_error.ok_exn
×
1036
              in
1037
              let height =
×
1038
                Transition_frontier.Breadcrumb.consensus_state breadcrumb
×
1039
                |> Consensus.Data.Consensus_state.blockchain_length
×
1040
                |> Mina_numbers.Length.to_int
1041
              in
1042
              [%test_eq: Staged_ledger_hash.t]
×
1043
                ~message:
1044
                  (sprintf "mismatch of staged ledger hash height %d" height)
×
1045
                (Transition_frontier.Breadcrumb.staged_ledger_hash breadcrumb)
×
1046
                (Staged_ledger.hash actual_staged_ledger) ) )
×
1047

1048
    (*
1049
    let%test_unit "if we see a new transition that is better than the \
1050
                   transition that we are syncing from, than we should \
1051
                   retarget our root" =
1052
      Quickcheck.test ~trials:1
1053
        Fake_network.Generator.(
1054
          gen ~max_frontier_length
1055
            [ fresh_peer
1056
            ; peer_with_branch ~frontier_branch_size:max_frontier_length
1057
            ; peer_with_branch
1058
                ~frontier_branch_size:((max_frontier_length * 2) + 2) ])
1059
        ~f:(fun fake_network ->
1060
          let [me; weaker_chain; stronger_chain] =
1061
            fake_network.peer_networks
1062
          in
1063
          let transition_reader, transition_writer =
1064
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
1065
              (Buffered (`Capacity 10, `Overflow Drop_head))
1066
          in
1067
          Envelope.Incoming.wrap
1068
            ~data:
1069
              ( Transition_frontier.best_tip weaker_chain.state.frontier
1070
              |> Transition_frontier.Breadcrumb.validated_transition
1071
              |> Mina_block.Validated.to_initial_validated )
1072
            ~sender:
1073
              (Envelope.Sender.Remote
1074
                 (weaker_chain.peer.host, weaker_chain.peer.peer_id))
1075
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1076
          Envelope.Incoming.wrap
1077
            ~data:
1078
              ( Transition_frontier.best_tip stronger_chain.state.frontier
1079
              |> Transition_frontier.Breadcrumb.validated_transition
1080
              |> Mina_block.Validated.to_initial_validated )
1081
            ~sender:
1082
              (Envelope.Sender.Remote
1083
                 (stronger_chain.peer.host, stronger_chain.peer.peer_id))
1084
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1085
          let new_frontier, sorted_external_transitions =
1086
            Async.Thread_safe.block_on_async_exn (fun () ->
1087
                run_bootstrap
1088
                  ~timeout_duration:(Block_time.Span.of_ms 60_000L)
1089
                  ~my_net:me ~transition_reader )
1090
          in
1091
          assert_transitions_increasingly_sorted
1092
            ~root:(Transition_frontier.root new_frontier)
1093
            sorted_external_transitions ;
1094
          [%test_result: Ledger_hash.t]
1095
            ( Ledger.Db.merkle_root
1096
            @@ Transition_frontier.root_snarked_ledger new_frontier )
1097
            ~expect:
1098
              ( Ledger.Db.merkle_root
1099
              @@ Transition_frontier.root_snarked_ledger
1100
                   stronger_chain.state.frontier ) )
1101
*)
1102
  end )
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc