• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2837

30 Oct 2024 07:56AM UTC coverage: 38.267% (-22.8%) from 61.098%
2837

push

buildkite

web-flow
Merge pull request #16306 from MinaProtocol/dkijania/fix_promotion_to_gcr

Fix promotion job PUBLISH misuse

7417 of 19382 relevant lines covered (38.27%)

298565.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.58
/src/lib/bootstrap_controller/bootstrap_controller.ml
1
(* Only show stdout for failed inline tests. *)
1✔
2
open Inline_test_quiet_logs
3
open Core
4
open Async
5
open Mina_base
6
module Ledger = Mina_ledger.Ledger
7
module Sync_ledger = Mina_ledger.Sync_ledger
8
open Mina_state
9
open Pipe_lib.Strict_pipe
10
open Network_peer
11

12
module type CONTEXT = sig
13
  val logger : Logger.t
14

15
  val precomputed_values : Precomputed_values.t
16

17
  val constraint_constants : Genesis_constants.Constraint_constants.t
18

19
  val consensus_constants : Consensus.Constants.t
20

21
  val compile_config : Mina_compile_config.t
22
end
23

24
type Structured_log_events.t += Bootstrap_complete
25
  [@@deriving register_event { msg = "Bootstrap state: complete." }]
×
26

27
type t =
28
  { context : (module CONTEXT)
29
  ; trust_system : Trust_system.t
30
  ; verifier : Verifier.t
31
  ; mutable best_seen_transition : Mina_block.initial_valid_block
32
  ; mutable current_root : Mina_block.initial_valid_block
33
  ; network : Mina_networking.t
34
  ; mutable num_of_root_snarked_ledger_retargeted : int
35
  }
36

37
type time = Time.Span.t
38

39
let time_to_yojson span =
40
  `String (Printf.sprintf "%f seconds" (Time.Span.to_sec span))
×
41

42
type opt_time = time option
43

44
let opt_time_to_yojson = function
45
  | Some time ->
×
46
      time_to_yojson time
47
  | None ->
×
48
      `Null
49

50
(** An auxiliary data structure for collecting various metrics for boostrap controller. *)
51
type bootstrap_cycle_stats =
×
52
  { cycle_result : string
×
53
  ; sync_ledger_time : time
×
54
  ; staged_ledger_data_download_time : time
×
55
  ; staged_ledger_construction_time : opt_time
×
56
  ; local_state_sync_required : bool
×
57
  ; local_state_sync_time : opt_time
×
58
  }
59
[@@deriving to_yojson]
60

61
let time_deferred deferred =
62
  let start_time = Time.now () in
×
63
  let%map result = deferred in
64
  let end_time = Time.now () in
×
65
  (Time.diff end_time start_time, result)
×
66

67
let worth_getting_root ({ context = (module Context); _ } as t) candidate =
68
  let module Context = struct
×
69
    include Context
70

71
    let logger =
72
      Logger.extend logger
×
73
        [ ( "selection_context"
74
          , `String "Bootstrap_controller.worth_getting_root" )
75
        ]
76
  end in
77
  Consensus.Hooks.equal_select_status `Take
78
  @@ Consensus.Hooks.select
79
       ~context:(module Context)
80
       ~existing:
81
         ( t.best_seen_transition |> Mina_block.Validation.block_with_hash
×
82
         |> With_hash.map ~f:Mina_block.consensus_state )
×
83
       ~candidate
84

85
let received_bad_proof ({ context = (module Context); _ } as t) host e =
86
  let open Context in
×
87
  Trust_system.(
88
    record t.trust_system logger host
89
      Actions.
90
        ( Violated_protocol
91
        , Some
92
            ( "Bad ancestor proof: $error"
93
            , [ ("error", Error_json.error_to_yojson e) ] ) ))
×
94

95
let done_syncing_root root_sync_ledger =
96
  Option.is_some (Sync_ledger.Db.peek_valid_tree root_sync_ledger)
×
97

98
let should_sync ~root_sync_ledger t candidate_state =
99
  (not @@ done_syncing_root root_sync_ledger)
×
100
  && worth_getting_root t candidate_state
×
101

102
(** Update [Synced_ledger]'s target and [best_seen_transition] and [current_root] accordingly. *)
103
let start_sync_job_with_peer ~sender ~root_sync_ledger
104
    ({ context = (module Context); _ } as t) peer_best_tip peer_root =
105
  let open Context in
×
106
  let%bind () =
107
    Trust_system.(
108
      record t.trust_system logger sender
×
109
        Actions.
110
          ( Fulfilled_request
111
          , Some ("Received verified peer root and best tip", []) ))
112
  in
113
  t.best_seen_transition <- peer_best_tip ;
×
114
  t.current_root <- peer_root ;
115
  let blockchain_state =
116
    t.current_root |> Mina_block.Validation.block |> Mina_block.header
×
117
    |> Mina_block.Header.protocol_state |> Protocol_state.blockchain_state
×
118
  in
119
  let expected_staged_ledger_hash =
×
120
    blockchain_state |> Blockchain_state.staged_ledger_hash
121
  in
122
  let snarked_ledger_hash =
×
123
    blockchain_state |> Blockchain_state.snarked_ledger_hash
124
  in
125
  return
×
126
  @@
127
  match
128
    Sync_ledger.Db.new_goal root_sync_ledger
129
      (Frozen_ledger_hash.to_ledger_hash snarked_ledger_hash)
×
130
      ~data:
131
        ( State_hash.With_state_hashes.state_hash
×
132
          @@ Mina_block.Validation.block_with_hash t.current_root
×
133
        , sender
134
        , expected_staged_ledger_hash )
135
      ~equal:(fun (hash1, _, _) (hash2, _, _) -> State_hash.equal hash1 hash2)
×
136
  with
137
  | `New ->
×
138
      t.num_of_root_snarked_ledger_retargeted <-
139
        t.num_of_root_snarked_ledger_retargeted + 1 ;
140
      `Syncing_new_snarked_ledger
141
  | `Update_data ->
×
142
      `Updating_root_transition
143
  | `Repeat ->
×
144
      `Ignored
145

146
(** For each transition, this function would compare it with the existing one.
147
    If the incoming transition is better, then download the merkle list from
148
    that transition to its root and verify it. If we get a better root than
149
    the existing one, then reset the Sync_ledger's target by calling
150
    [start_sync_job_with_peer] function. *)
151
let on_transition ({ context = (module Context); _ } as t) ~sender
152
    ~root_sync_ledger ~genesis_constants candidate_transition =
153
  let open Context in
×
154
  let candidate_consensus_state =
155
    With_hash.map ~f:Mina_block.consensus_state candidate_transition
156
  in
157
  if not @@ should_sync ~root_sync_ledger t candidate_consensus_state then
×
158
    Deferred.return `Ignored
×
159
  else
160
    match%bind
161
      Mina_networking.get_ancestry t.network sender.Peer.peer_id
×
162
        (With_hash.map_hash candidate_consensus_state
×
163
           ~f:State_hash.State_hashes.state_hash )
164
    with
165
    | Error e ->
×
166
        [%log error]
×
167
          ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
168
          !"Could not get the proof of the root transition from the network: \
169
            $error" ;
170
        Deferred.return `Ignored
×
171
    | Ok peer_root_with_proof -> (
×
172
        match%bind
173
          Sync_handler.Root.verify
×
174
            ~context:(module Context)
175
            ~verifier:t.verifier ~genesis_constants candidate_consensus_state
176
            peer_root_with_proof.data
177
        with
178
        | Ok (`Root root, `Best_tip best_tip) ->
×
179
            if done_syncing_root root_sync_ledger then return `Ignored
×
180
            else
181
              start_sync_job_with_peer ~sender ~root_sync_ledger t best_tip root
×
182
        | Error e ->
×
183
            return (received_bad_proof t sender e |> Fn.const `Ignored) )
×
184

185
(** A helper function that wraps the calls to Sync_ledger and iterate through
186
    incoming transitions, add those to the transition_cache and calls
187
    [on_transition] function. *)
188
let sync_ledger ({ context = (module Context); _ } as t) ~preferred
189
    ~root_sync_ledger ~transition_graph ~sync_ledger_reader ~genesis_constants =
190
  let open Context in
×
191
  let query_reader = Sync_ledger.Db.query_reader root_sync_ledger in
192
  let response_writer = Sync_ledger.Db.answer_writer root_sync_ledger in
×
193
  Mina_networking.glue_sync_ledger ~preferred t.network query_reader
×
194
    response_writer ;
195
  Reader.iter sync_ledger_reader ~f:(fun (`Block incoming_transition, _) ->
×
196
      let (transition, _) : Mina_block.initial_valid_block =
×
197
        Envelope.Incoming.data incoming_transition
198
      in
199
      let previous_state_hash =
×
200
        With_hash.data transition |> Mina_block.header
×
201
        |> Mina_block.Header.protocol_state
×
202
        |> Protocol_state.previous_state_hash
203
      in
204
      let sender = Envelope.Incoming.remote_sender_exn incoming_transition in
×
205
      Transition_cache.add transition_graph ~parent:previous_state_hash
×
206
        incoming_transition ;
207
      (* TODO: Efficiently limiting the number of green threads in #1337 *)
208
      if
×
209
        worth_getting_root t
210
          (With_hash.map ~f:Mina_block.consensus_state transition)
×
211
      then (
×
212
        [%log trace] "Added the transition from sync_ledger_reader into cache"
×
213
          ~metadata:
214
            [ ( "state_hash"
215
              , State_hash.to_yojson
×
216
                  (State_hash.With_state_hashes.state_hash transition) )
×
217
            ; ( "external_transition"
218
              , Mina_block.to_yojson (With_hash.data transition) )
×
219
            ] ;
220

221
        Deferred.ignore_m
×
222
        @@ on_transition t ~sender ~root_sync_ledger ~genesis_constants
×
223
             transition )
224
      else Deferred.unit )
×
225

226
let external_transition_compare ~context:(module Context : CONTEXT) =
227
  Comparable.lift
×
228
    (fun existing candidate ->
229
      (* To prevent the logger to spam a lot of messsages, the logger input is set to null *)
230
      if
×
231
        State_hash.equal
232
          (State_hash.With_state_hashes.state_hash existing)
×
233
          (State_hash.With_state_hashes.state_hash candidate)
×
234
      then 0
×
235
      else if
×
236
        Consensus.Hooks.equal_select_status `Keep
237
        @@ Consensus.Hooks.select ~context:(module Context) ~existing ~candidate
238
      then -1
×
239
      else 1 )
×
240
    ~f:(With_hash.map ~f:Mina_block.consensus_state)
241

242
(** The entry point function for bootstrap controller. When bootstrap finished
243
    it would return a transition frontier with the root breadcrumb and a list
244
    of transitions collected during bootstrap.
245

246
    Bootstrap controller would do the following steps to contrust the
247
    transition frontier:
248
    1. Download the root snarked_ledger.
249
    2. Download the scan state and pending coinbases.
250
    3. Construct the staged ledger from the snarked ledger, scan state and
251
       pending coinbases.
252
    4. Synchronize the consensus local state if necessary.
253
    5. Close the old frontier and reload a new one from disk.
254
 *)
255
let run ~context:(module Context : CONTEXT) ~trust_system ~verifier ~network
256
    ~consensus_local_state ~transition_reader ~best_seen_transition
257
    ~persistent_root ~persistent_frontier ~initial_root_transition ~catchup_mode
258
    =
259
  let open Context in
×
260
  O1trace.thread "bootstrap" (fun () ->
261
      let genesis_constants =
×
262
        Precomputed_values.genesis_constants precomputed_values
263
      in
264
      let constraint_constants = precomputed_values.constraint_constants in
×
265
      let rec loop previous_cycles =
266
        let sync_ledger_pipe = "sync ledger pipe" in
×
267
        let sync_ledger_reader, sync_ledger_writer =
268
          create ~name:sync_ledger_pipe
269
            (Buffered
270
               ( `Capacity 50
271
               , `Overflow
272
                   (Drop_head
273
                      (fun ( `Block
274
                               (block :
275
                                 Mina_block.Validation.initial_valid_with_block
276
                                 Envelope.Incoming.t )
277
                           , `Valid_cb valid_cb ) ->
278
                        Mina_metrics.(
×
279
                          Counter.inc_one
×
280
                            Pipe.Drop_on_overflow.bootstrap_sync_ledger) ;
281
                        Mina_block.handle_dropped_transition ?valid_cb
282
                          ( With_hash.hash
×
283
                          @@ Mina_block.Validation.block_with_hash
×
284
                          @@ Envelope.Incoming.data block )
×
285
                          ~pipe_name:sync_ledger_pipe ~logger ) ) ) )
286
        in
287
        don't_wait_for
×
288
          (transfer_while_writer_alive transition_reader sync_ledger_writer
×
289
             ~f:Fn.id ) ;
290
        let initial_root_transition =
×
291
          initial_root_transition |> Mina_block.Validated.remember
×
292
          |> Mina_block.Validation.reset_frontier_dependencies_validation
×
293
          |> Mina_block.Validation.reset_staged_ledger_diff_validation
294
        in
295
        let t =
×
296
          { network
297
          ; context = (module Context)
298
          ; trust_system
299
          ; verifier
300
          ; best_seen_transition = initial_root_transition
301
          ; current_root = initial_root_transition
302
          ; num_of_root_snarked_ledger_retargeted = 0
303
          }
304
        in
305
        let transition_graph = Transition_cache.create () in
306
        let temp_persistent_root_instance =
×
307
          Transition_frontier.Persistent_root.create_instance_exn
308
            persistent_root
309
        in
310
        let temp_snarked_ledger =
×
311
          Transition_frontier.Persistent_root.Instance.snarked_ledger
312
            temp_persistent_root_instance
313
        in
314
        (* step 1. download snarked_ledger *)
315
        let%bind sync_ledger_time, (hash, sender, expected_staged_ledger_hash) =
316
          time_deferred
×
317
            (let root_sync_ledger =
318
               Sync_ledger.Db.create temp_snarked_ledger ~logger ~trust_system
319
             in
320
             don't_wait_for
×
321
               (sync_ledger t
×
322
                  ~preferred:
323
                    ( Option.to_list best_seen_transition
×
324
                    |> List.filter_map ~f:(fun x ->
×
325
                           match Envelope.Incoming.sender x with
×
326
                           | Local ->
×
327
                               None
328
                           | Remote r ->
×
329
                               Some r ) )
330
                  ~root_sync_ledger ~transition_graph ~sync_ledger_reader
331
                  ~genesis_constants ) ;
332
             (* We ignore the resulting ledger returned here since it will always
333
                * be the same as the ledger we started with because we are syncing
334
                * a db ledger. *)
335
             let%map _, data = Sync_ledger.Db.valid_tree root_sync_ledger in
×
336
             Sync_ledger.Db.destroy root_sync_ledger ;
×
337
             data )
×
338
        in
339
        Mina_metrics.(
×
340
          Counter.inc Bootstrap.root_snarked_ledger_sync_ms
×
341
            Time.Span.(to_ms sync_ledger_time)) ;
×
342
        Mina_metrics.(
343
          Gauge.set Bootstrap.num_of_root_snarked_ledger_retargeted
×
344
            (Float.of_int t.num_of_root_snarked_ledger_retargeted)) ;
×
345
        (* step 2. Download scan state and pending coinbases. *)
346
        let%bind ( staged_ledger_data_download_time
347
                 , staged_ledger_construction_time
348
                 , staged_ledger_aux_result ) =
349
          let%bind ( staged_ledger_data_download_time
350
                   , staged_ledger_data_download_result ) =
351
            time_deferred
×
352
              (Mina_networking
353
               .get_staged_ledger_aux_and_pending_coinbases_at_hash t.network
×
354
                 sender.peer_id hash )
355
          in
356
          match staged_ledger_data_download_result with
×
357
          | Error err ->
×
358
              Deferred.return (staged_ledger_data_download_time, None, Error err)
359
          | Ok
×
360
              ( scan_state
361
              , expected_merkle_root
362
              , pending_coinbases
363
              , protocol_states ) -> (
364
              let%map staged_ledger_construction_result =
365
                O1trace.thread "construct_root_staged_ledger" (fun () ->
×
366
                    let open Deferred.Or_error.Let_syntax in
×
367
                    let received_staged_ledger_hash =
368
                      Staged_ledger_hash.of_aux_ledger_and_coinbase_hash
369
                        (Staged_ledger.Scan_state.hash scan_state)
×
370
                        expected_merkle_root pending_coinbases
371
                    in
372
                    [%log debug]
×
373
                      ~metadata:
374
                        [ ( "expected_staged_ledger_hash"
375
                          , Staged_ledger_hash.to_yojson
×
376
                              expected_staged_ledger_hash )
377
                        ; ( "received_staged_ledger_hash"
378
                          , Staged_ledger_hash.to_yojson
×
379
                              received_staged_ledger_hash )
380
                        ]
381
                      "Comparing $expected_staged_ledger_hash to \
382
                       $received_staged_ledger_hash" ;
383
                    let%bind new_root =
384
                      t.current_root
385
                      |> Mina_block.Validation
386
                         .skip_frontier_dependencies_validation
×
387
                           `This_block_belongs_to_a_detached_subtree
388
                      |> Mina_block.Validation.validate_staged_ledger_hash
×
389
                           (`Staged_ledger_already_materialized
390
                             received_staged_ledger_hash )
391
                      |> Result.map_error ~f:(fun _ ->
×
392
                             Error.of_string
×
393
                               "received faulty scan state from peer" )
394
                      |> Deferred.return
×
395
                    in
396
                    let protocol_states =
×
397
                      List.map protocol_states
398
                        ~f:(With_hash.of_data ~hash_data:Protocol_state.hashes)
399
                    in
400
                    let%bind protocol_states =
401
                      Staged_ledger.Scan_state.check_required_protocol_states
×
402
                        scan_state ~protocol_states
403
                      |> Deferred.return
×
404
                    in
405
                    let protocol_states_map =
×
406
                      protocol_states
407
                      |> List.map ~f:(fun ps ->
×
408
                             (State_hash.With_state_hashes.state_hash ps, ps) )
×
409
                      |> State_hash.Map.of_alist_exn
410
                    in
411
                    let get_state hash =
×
412
                      match Map.find protocol_states_map hash with
×
413
                      | None ->
×
414
                          let new_state_hash =
415
                            State_hash.With_state_hashes.state_hash
416
                              (fst new_root)
×
417
                          in
418
                          [%log error]
×
419
                            ~metadata:
420
                              [ ("new_root", State_hash.to_yojson new_state_hash)
×
421
                              ; ("state_hash", State_hash.to_yojson hash)
×
422
                              ]
423
                            "Protocol state (for scan state transactions) for \
424
                             $state_hash not found when bootstrapping to the \
425
                             new root $new_root" ;
426
                          Or_error.errorf
×
427
                            !"Protocol state (for scan state transactions) for \
×
428
                              %{sexp:State_hash.t} not found when \
429
                              bootstrapping to the new root \
430
                              %{sexp:State_hash.t}"
431
                            hash new_state_hash
432
                      | Some protocol_state ->
×
433
                          Ok (With_hash.data protocol_state)
×
434
                    in
435
                    (* step 3. Construct staged ledger from snarked ledger, scan state
436
                       and pending coinbases. *)
437
                    (* Construct the staged ledger before constructing the transition
438
                     * frontier in order to verify the scan state we received.
439
                     * TODO: reorganize the code to avoid doing this twice (#3480) *)
440
                    let open Deferred.Let_syntax in
441
                    let%map staged_ledger_construction_time, construction_result
442
                        =
443
                      time_deferred
×
444
                        (let open Deferred.Let_syntax in
445
                        let temp_mask =
446
                          Ledger.of_database temp_snarked_ledger
447
                        in
448
                        let%map result =
449
                          Staged_ledger
450
                          .of_scan_state_pending_coinbases_and_snarked_ledger
451
                            ~logger
452
                            ~snarked_local_state:
453
                              Mina_block.(
454
                                t.current_root |> Validation.block |> header
×
455
                                |> Header.protocol_state
×
456
                                |> Protocol_state.blockchain_state
×
457
                                |> Blockchain_state.snarked_local_state)
×
458
                            ~verifier ~constraint_constants ~scan_state
459
                            ~snarked_ledger:temp_mask ~expected_merkle_root
460
                            ~pending_coinbases ~get_state
461
                        in
462
                        ignore
×
463
                          ( Ledger.Maskable.unregister_mask_exn ~loc:__LOC__
×
464
                              temp_mask
465
                            : Ledger.unattached_mask ) ;
466
                        Result.map result
467
                          ~f:
468
                            (const
×
469
                               ( scan_state
470
                               , pending_coinbases
471
                               , new_root
472
                               , protocol_states ) ))
473
                    in
474
                    Ok (staged_ledger_construction_time, construction_result) )
×
475
              in
476
              match staged_ledger_construction_result with
×
477
              | Error err ->
×
478
                  (staged_ledger_data_download_time, None, Error err)
479
              | Ok (staged_ledger_construction_time, result) ->
×
480
                  ( staged_ledger_data_download_time
481
                  , Some staged_ledger_construction_time
482
                  , result ) )
483
        in
484
        Transition_frontier.Persistent_root.Instance.close
×
485
          temp_persistent_root_instance ;
486
        match staged_ledger_aux_result with
×
487
        | Error e ->
×
488
            let%bind () =
489
              Trust_system.(
490
                record t.trust_system logger sender
×
491
                  Actions.
492
                    ( Outgoing_connection_error
493
                    , Some
494
                        ( "Can't find scan state from the peer or received \
495
                           faulty scan state from the peer."
496
                        , [] ) ))
497
            in
498
            [%log error]
×
499
              ~metadata:
500
                [ ("error", Error_json.error_to_yojson e)
×
501
                ; ("state_hash", State_hash.to_yojson hash)
×
502
                ; ( "expected_staged_ledger_hash"
503
                  , Staged_ledger_hash.to_yojson expected_staged_ledger_hash )
×
504
                ]
505
              "Failed to find scan state for the transition with hash \
506
               $state_hash from the peer or received faulty scan state: \
507
               $error. Retry bootstrap" ;
508
            Writer.close sync_ledger_writer ;
×
509
            let this_cycle =
×
510
              { cycle_result = "failed to download and construct scan state"
511
              ; sync_ledger_time
512
              ; staged_ledger_data_download_time
513
              ; staged_ledger_construction_time
514
              ; local_state_sync_required = false
515
              ; local_state_sync_time = None
516
              }
517
            in
518
            loop (this_cycle :: previous_cycles)
519
        | Ok (scan_state, pending_coinbase, new_root, protocol_states) -> (
×
520
            let%bind () =
521
              Trust_system.(
522
                record t.trust_system logger sender
×
523
                  Actions.
524
                    ( Fulfilled_request
525
                    , Some ("Received valid scan state from peer", []) ))
526
            in
527
            let best_seen_block_with_hash, _ = t.best_seen_transition in
×
528
            let consensus_state =
529
              With_hash.data best_seen_block_with_hash
×
530
              |> Mina_block.header |> Mina_block.Header.protocol_state
×
531
              |> Protocol_state.consensus_state
532
            in
533
            (* step 4. Synchronize consensus local state if necessary *)
534
            let%bind ( local_state_sync_time
535
                     , (local_state_sync_required, local_state_sync_result) ) =
536
              time_deferred
×
537
                ( match
538
                    Consensus.Hooks.required_local_state_sync
539
                      ~constants:precomputed_values.consensus_constants
540
                      ~consensus_state ~local_state:consensus_local_state
541
                  with
542
                | None ->
×
543
                    [%log debug]
×
544
                      ~metadata:
545
                        [ ( "local_state"
546
                          , Consensus.Data.Local_state.to_yojson
×
547
                              consensus_local_state )
548
                        ; ( "consensus_state"
549
                          , Consensus.Data.Consensus_state.Value.to_yojson
×
550
                              consensus_state )
551
                        ]
552
                      "Not synchronizing consensus local state" ;
553
                    Deferred.return (false, Or_error.return ())
×
554
                | Some sync_jobs ->
×
555
                    [%log info] "Synchronizing consensus local state" ;
×
556
                    let%map result =
557
                      Consensus.Hooks.sync_local_state
×
558
                        ~context:(module Context)
559
                        ~local_state:consensus_local_state ~trust_system
560
                        ~glue_sync_ledger:
561
                          (Mina_networking.glue_sync_ledger t.network)
×
562
                        sync_jobs
563
                    in
564
                    (true, result) )
×
565
            in
566
            match local_state_sync_result with
×
567
            | Error e ->
×
568
                [%log error]
×
569
                  ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
570
                  "Local state sync failed: $error. Retry bootstrap" ;
571
                Writer.close sync_ledger_writer ;
×
572
                let this_cycle =
×
573
                  { cycle_result = "failed to synchronize local state"
574
                  ; sync_ledger_time
575
                  ; staged_ledger_data_download_time
576
                  ; staged_ledger_construction_time
577
                  ; local_state_sync_required
578
                  ; local_state_sync_time = Some local_state_sync_time
579
                  }
580
                in
581
                loop (this_cycle :: previous_cycles)
582
            | Ok () ->
×
583
                (* step 5. Close the old frontier and reload a new one from disk. *)
584
                let new_root_data : Transition_frontier.Root_data.Limited.t =
585
                  Transition_frontier.Root_data.Limited.create
586
                    ~transition:(Mina_block.Validated.lift new_root)
×
587
                    ~scan_state ~pending_coinbase ~protocol_states
588
                in
589
                let%bind () =
590
                  Transition_frontier.Persistent_frontier.reset_database_exn
×
591
                    persistent_frontier ~root_data:new_root_data
592
                    ~genesis_state_hash:
593
                      (State_hash.With_state_hashes.state_hash
×
594
                         precomputed_values.protocol_state_with_hashes )
595
                in
596
                (* TODO: lazy load db in persistent root to avoid unecessary opens like this *)
597
                Transition_frontier.Persistent_root.(
×
598
                  with_instance_exn persistent_root ~f:(fun instance ->
×
599
                      Instance.set_root_state_hash instance
×
600
                      @@ Mina_block.Validated.state_hash
×
601
                      @@ Mina_block.Validated.lift new_root )) ;
×
602
                let%map new_frontier =
603
                  let fail msg =
604
                    failwith
×
605
                      ( "failed to initialize transition frontier after \
606
                         bootstrapping: " ^ msg )
607
                  in
608
                  Transition_frontier.load
×
609
                    ~context:(module Context)
610
                    ~retry_with_fresh_db:false ~verifier ~consensus_local_state
611
                    ~persistent_root ~persistent_frontier ~catchup_mode ()
612
                  >>| function
×
613
                  | Ok frontier ->
×
614
                      frontier
615
                  | Error (`Failure msg) ->
×
616
                      fail msg
617
                  | Error `Bootstrap_required ->
×
618
                      fail
619
                        "bootstrap still required (indicates logical error in \
620
                         code)"
621
                  | Error `Persistent_frontier_malformed ->
×
622
                      fail "persistent frontier was malformed"
623
                  | Error `Snarked_ledger_mismatch ->
×
624
                      fail
625
                        "this should not happen, because we just reset the \
626
                         snarked_ledger"
627
                in
628
                [%str_log info] Bootstrap_complete ;
×
629
                let collected_transitions =
×
630
                  Transition_cache.data transition_graph
631
                in
632
                let logger =
×
633
                  Logger.extend logger
634
                    [ ( "context"
635
                      , `String "Filter collected transitions in bootstrap" )
636
                    ]
637
                in
638
                let root_consensus_state =
×
639
                  Transition_frontier.(
640
                    Breadcrumb.consensus_state_with_hashes (root new_frontier))
×
641
                in
642
                let filtered_collected_transitions =
643
                  List.filter collected_transitions
644
                    ~f:(fun incoming_transition ->
645
                      let transition =
×
646
                        Envelope.Incoming.data incoming_transition
×
647
                        |> Mina_block.Validation.block_with_hash
648
                      in
649
                      Consensus.Hooks.equal_select_status `Take
×
650
                      @@ Consensus.Hooks.select
651
                           ~context:(module Context)
652
                           ~existing:root_consensus_state
653
                           ~candidate:
654
                             (With_hash.map ~f:Mina_block.consensus_state
×
655
                                transition ) )
656
                in
657
                [%log debug] "Sorting filtered transitions by consensus state"
×
658
                  ~metadata:[] ;
659
                let sorted_filtered_collected_transitions =
×
660
                  O1trace.sync_thread "sorting_collected_transitions" (fun () ->
661
                      List.sort filtered_collected_transitions
×
662
                        ~compare:
663
                          (Comparable.lift
×
664
                             ~f:
665
                               (Fn.compose Mina_block.Validation.block_with_hash
×
666
                                  Envelope.Incoming.data )
667
                             (external_transition_compare
668
                                ~context:(module Context) ) ) )
669
                in
670
                let this_cycle =
×
671
                  { cycle_result = "success"
672
                  ; sync_ledger_time
673
                  ; staged_ledger_data_download_time
674
                  ; staged_ledger_construction_time
675
                  ; local_state_sync_required
676
                  ; local_state_sync_time = Some local_state_sync_time
677
                  }
678
                in
679
                ( this_cycle :: previous_cycles
680
                , (new_frontier, sorted_filtered_collected_transitions) ) )
681
      in
682
      let%map time_elapsed, (cycles, result) = time_deferred (loop []) in
×
683
      [%log info] "Bootstrap completed in $time_elapsed: $bootstrap_stats"
×
684
        ~metadata:
685
          [ ("time_elapsed", time_to_yojson time_elapsed)
×
686
          ; ( "bootstrap_stats"
687
            , `List (List.map ~f:bootstrap_cycle_stats_to_yojson cycles) )
×
688
          ] ;
689
      Mina_metrics.(
×
690
        Gauge.set Bootstrap.bootstrap_time_ms
×
691
          Core.Time.(Span.to_ms @@ time_elapsed)) ;
×
692
      result )
693

694
let%test_module "Bootstrap_controller tests" =
695
  ( module struct
696
    open Pipe_lib
697

698
    let max_frontier_length =
699
      Transition_frontier.global_max_length Genesis_constants.For_unit_tests.t
×
700

701
    let logger = Logger.create ()
×
702

703
    let trust_system =
704
      let s = Trust_system.null () in
705
      don't_wait_for
×
706
        (Pipe_lib.Strict_pipe.Reader.iter
×
707
           (Trust_system.upcall_pipe s)
×
708
           ~f:(const Deferred.unit) ) ;
×
709
      s
×
710

711
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
712

713
    let proof_level = precomputed_values.proof_level
714

715
    let constraint_constants = precomputed_values.constraint_constants
716

717
    let compile_config = Mina_compile_config.For_unit_tests.t
718

719
    module Context = struct
720
      let logger = Logger.create ()
×
721

722
      let precomputed_values = precomputed_values
723

724
      let constraint_constants =
725
        Genesis_constants.For_unit_tests.Constraint_constants.t
726

727
      let consensus_constants = precomputed_values.consensus_constants
728

729
      let compile_config = compile_config
730
    end
731

732
    let verifier =
733
      Async.Thread_safe.block_on_async_exn (fun () ->
×
734
          Verifier.create ~logger ~proof_level ~constraint_constants
×
735
            ~conf_dir:None
736
            ~pids:(Child_processes.Termination.create_pid_table ())
×
737
            ~commit_id:"not specified for unit tests" () )
738

739
    module Genesis_ledger = (val precomputed_values.genesis_ledger)
740

741
    let downcast_transition ~sender transition =
742
      let transition =
×
743
        transition |> Mina_block.Validated.remember
×
744
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
745
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
746
      in
747
      Envelope.Incoming.wrap ~data:transition
×
748
        ~sender:(Envelope.Sender.Remote sender)
749

750
    let downcast_breadcrumb ~sender breadcrumb =
751
      downcast_transition ~sender
×
752
        (Transition_frontier.Breadcrumb.validated_transition breadcrumb)
×
753

754
    let make_non_running_bootstrap ~genesis_root ~network =
755
      let transition =
×
756
        genesis_root
757
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
758
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
759
      in
760
      { context = (module Context)
×
761
      ; trust_system
762
      ; verifier
763
      ; best_seen_transition = transition
764
      ; current_root = transition
765
      ; network
766
      ; num_of_root_snarked_ledger_retargeted = 0
767
      }
768

769
    let%test_unit "Bootstrap controller caches all transitions it is passed \
770
                   through the transition_reader" =
771
      let branch_size = (max_frontier_length * 2) + 2 in
×
772
      Quickcheck.test ~trials:1
773
        (let open Quickcheck.Generator.Let_syntax in
774
        (* we only need one node for this test, but we need more than one peer so that mina_networking does not throw an error *)
775
        let%bind fake_network =
776
          Fake_network.Generator.(
777
            gen ~precomputed_values ~verifier ~max_frontier_length
×
778
              ~compile_config [ fresh_peer; fresh_peer ]
779
              ~use_super_catchup:false)
780
        in
781
        let%map make_branch =
782
          Transition_frontier.Breadcrumb.For_tests.gen_seq ~precomputed_values
×
783
            ~verifier
784
            ~accounts_with_secret_keys:(Lazy.force Genesis_ledger.accounts)
×
785
            branch_size
786
        in
787
        let [ me; _ ] = fake_network.peer_networks in
×
788
        let branch =
789
          Async.Thread_safe.block_on_async_exn (fun () ->
790
              make_branch (Transition_frontier.root me.state.frontier) )
×
791
        in
792
        (fake_network, branch))
×
793
        ~f:(fun (fake_network, branch) ->
794
          let [ me; other ] = fake_network.peer_networks in
×
795
          let genesis_root =
796
            Transition_frontier.(
797
              Breadcrumb.validated_transition @@ root me.state.frontier)
×
798
            |> Mina_block.Validated.remember
799
          in
800
          let transition_graph = Transition_cache.create () in
×
801
          let sync_ledger_reader, sync_ledger_writer =
×
802
            Pipe_lib.Strict_pipe.create ~name:"sync_ledger_reader" Synchronous
803
          in
804
          let bootstrap =
×
805
            make_non_running_bootstrap ~genesis_root ~network:me.network
806
          in
807
          let root_sync_ledger =
808
            Sync_ledger.Db.create
809
              (Transition_frontier.root_snarked_ledger me.state.frontier)
×
810
              ~logger ~trust_system
811
          in
812
          Async.Thread_safe.block_on_async_exn (fun () ->
×
813
              let sync_deferred =
×
814
                sync_ledger bootstrap ~root_sync_ledger ~transition_graph
815
                  ~preferred:[] ~sync_ledger_reader
816
                  ~genesis_constants:Genesis_constants.For_unit_tests.t
817
              in
818
              let%bind () =
819
                Deferred.List.iter branch ~f:(fun breadcrumb ->
×
820
                    Strict_pipe.Writer.write sync_ledger_writer
×
821
                      ( `Block
822
                          (downcast_breadcrumb ~sender:other.peer breadcrumb)
×
823
                      , `Vallid_cb None ) )
824
              in
825
              Strict_pipe.Writer.close sync_ledger_writer ;
×
826
              sync_deferred ) ;
×
827
          let expected_transitions =
×
828
            List.map branch
829
              ~f:
830
                (Fn.compose Mina_block.Validation.block_with_hash
×
831
                   (Fn.compose Mina_block.Validated.remember
×
832
                      Transition_frontier.Breadcrumb.validated_transition ) )
833
          in
834
          let saved_transitions =
×
835
            Transition_cache.data transition_graph
×
836
            |> List.map
837
                 ~f:
838
                   (Fn.compose Mina_block.Validation.block_with_hash
×
839
                      Envelope.Incoming.data )
840
          in
841
          let module E = struct
×
842
            module T = struct
843
              type t = Mina_block.t State_hash.With_state_hashes.t
×
844
              [@@deriving sexp]
845

846
              let compare = external_transition_compare ~context:(module Context)
847
            end
848

849
            include Comparable.Make (T)
850
          end in
851
          [%test_result: E.Set.t]
×
852
            (E.Set.of_list saved_transitions)
×
853
            ~expect:(E.Set.of_list expected_transitions) )
×
854

855
    let run_bootstrap ~timeout_duration ~my_net ~transition_reader =
856
      let open Fake_network in
×
857
      let time_controller = Block_time.Controller.basic ~logger in
858
      let persistent_root =
859
        Transition_frontier.persistent_root my_net.state.frontier
860
      in
861
      let persistent_frontier =
×
862
        Transition_frontier.persistent_frontier my_net.state.frontier
863
      in
864
      let initial_root_transition =
×
865
        Transition_frontier.(
866
          Breadcrumb.validated_transition (root my_net.state.frontier))
×
867
      in
868
      let%bind () =
869
        Transition_frontier.close ~loc:__LOC__ my_net.state.frontier
×
870
      in
871
      [%log info] "bootstrap begin" ;
×
872
      Block_time.Timeout.await_exn time_controller ~timeout_duration
×
873
        (run
874
           ~context:(module Context)
875
           ~trust_system ~verifier ~network:my_net.network
876
           ~best_seen_transition:None
877
           ~consensus_local_state:my_net.state.consensus_local_state
878
           ~transition_reader ~persistent_root ~persistent_frontier
879
           ~catchup_mode:`Normal ~initial_root_transition )
880

881
    let assert_transitions_increasingly_sorted ~root
882
        (incoming_transitions :
883
          Mina_block.initial_valid_block Envelope.Incoming.t list ) =
884
      let root = Transition_frontier.Breadcrumb.block root in
×
885
      ignore
×
886
        ( List.fold_result ~init:root incoming_transitions
×
887
            ~f:(fun max_acc incoming_transition ->
888
              let With_hash.{ data = transition; _ }, _ =
×
889
                Envelope.Incoming.data incoming_transition
890
              in
891
              let open Result.Let_syntax in
×
892
              let%map () =
893
                Result.ok_if_true
×
894
                  Mina_numbers.Length.(
895
                    Mina_block.blockchain_length max_acc
×
896
                    <= Mina_block.blockchain_length transition)
×
897
                  ~error:
898
                    (Error.of_string
×
899
                       "The blocks are not sorted in increasing order" )
900
              in
901
              transition )
×
902
          |> Or_error.ok_exn
×
903
          : Mina_block.t )
904

905
    let%test_unit "sync with one node after receiving a transition" =
906
      Quickcheck.test ~trials:1
×
907
        Fake_network.Generator.(
908
          gen ~precomputed_values ~verifier ~max_frontier_length
×
909
            ~use_super_catchup:false ~compile_config
910
            [ fresh_peer
911
            ; peer_with_branch
912
                ~frontier_branch_size:((max_frontier_length * 2) + 2)
913
            ])
914
        ~f:(fun fake_network ->
915
          let [ my_net; peer_net ] = fake_network.peer_networks in
×
916
          let transition_reader, transition_writer =
917
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
918
              (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
919
          in
920
          let block =
×
921
            Envelope.Incoming.wrap
922
              ~data:
923
                ( Transition_frontier.best_tip peer_net.state.frontier
×
924
                |> Transition_frontier.Breadcrumb.validated_transition
×
925
                |> Mina_block.Validated.remember
×
926
                |> Mina_block.Validation.reset_frontier_dependencies_validation
×
927
                |> Mina_block.Validation.reset_staged_ledger_diff_validation )
×
928
              ~sender:(Envelope.Sender.Remote peer_net.peer)
929
          in
930
          Pipe_lib.Strict_pipe.Writer.write transition_writer
931
            (`Block block, `Valid_cb None) ;
932
          let new_frontier, sorted_external_transitions =
×
933
            Async.Thread_safe.block_on_async_exn (fun () ->
934
                run_bootstrap
×
935
                  ~timeout_duration:(Block_time.Span.of_ms 30_000L)
×
936
                  ~my_net ~transition_reader )
937
          in
938
          assert_transitions_increasingly_sorted
×
939
            ~root:(Transition_frontier.root new_frontier)
×
940
            sorted_external_transitions ;
941
          [%test_result: Ledger_hash.t]
×
942
            ( Ledger.Db.merkle_root
×
943
            @@ Transition_frontier.root_snarked_ledger new_frontier )
×
944
            ~expect:
945
              ( Ledger.Db.merkle_root
×
946
              @@ Transition_frontier.root_snarked_ledger peer_net.state.frontier
×
947
              ) )
948

949
    let%test_unit "reconstruct staged_ledgers using \
950
                   of_scan_state_and_snarked_ledger" =
951
      Quickcheck.test ~trials:1
×
952
        (Transition_frontier.For_tests.gen ~precomputed_values ~verifier
×
953
           ~max_length:max_frontier_length ~size:max_frontier_length () )
954
        ~f:(fun frontier ->
955
          Thread_safe.block_on_async_exn
×
956
          @@ fun () ->
957
          Deferred.List.iter (Transition_frontier.all_breadcrumbs frontier)
×
958
            ~f:(fun breadcrumb ->
959
              let staged_ledger =
×
960
                Transition_frontier.Breadcrumb.staged_ledger breadcrumb
961
              in
962
              let expected_merkle_root =
×
963
                Staged_ledger.ledger staged_ledger |> Ledger.merkle_root
×
964
              in
965
              let snarked_ledger =
×
966
                Transition_frontier.root_snarked_ledger frontier
×
967
                |> Ledger.of_database
968
              in
969
              let snarked_local_state =
×
970
                Transition_frontier.root frontier
×
971
                |> Transition_frontier.Breadcrumb.protocol_state
×
972
                |> Protocol_state.blockchain_state
×
973
                |> Blockchain_state.snarked_local_state
974
              in
975
              let scan_state = Staged_ledger.scan_state staged_ledger in
×
976
              let get_state hash =
×
977
                match Transition_frontier.find_protocol_state frontier hash with
×
978
                | Some protocol_state ->
×
979
                    Ok protocol_state
980
                | None ->
×
981
                    Or_error.errorf
982
                      !"Protocol state (for scan state transactions) for \
×
983
                        %{sexp:State_hash.t} not found"
984
                      hash
985
              in
986
              let pending_coinbases =
987
                Staged_ledger.pending_coinbase_collection staged_ledger
988
              in
989
              let%map actual_staged_ledger =
990
                Staged_ledger.of_scan_state_pending_coinbases_and_snarked_ledger
991
                  ~scan_state ~logger ~verifier ~constraint_constants
992
                  ~snarked_ledger ~snarked_local_state ~expected_merkle_root
993
                  ~pending_coinbases ~get_state
994
                |> Deferred.Or_error.ok_exn
×
995
              in
996
              assert (
×
997
                Staged_ledger_hash.equal
×
998
                  (Staged_ledger.hash staged_ledger)
×
999
                  (Staged_ledger.hash actual_staged_ledger) ) ) )
×
1000

1001
    (*
1002
    let%test_unit "if we see a new transition that is better than the \
1003
                   transition that we are syncing from, than we should \
1004
                   retarget our root" =
1005
      Quickcheck.test ~trials:1
1006
        Fake_network.Generator.(
1007
          gen ~max_frontier_length
1008
            [ fresh_peer
1009
            ; peer_with_branch ~frontier_branch_size:max_frontier_length
1010
            ; peer_with_branch
1011
                ~frontier_branch_size:((max_frontier_length * 2) + 2) ])
1012
        ~f:(fun fake_network ->
1013
          let [me; weaker_chain; stronger_chain] =
1014
            fake_network.peer_networks
1015
          in
1016
          let transition_reader, transition_writer =
1017
            Pipe_lib.Strict_pipe.create ~name:(__MODULE__ ^ __LOC__)
1018
              (Buffered (`Capacity 10, `Overflow Drop_head))
1019
          in
1020
          Envelope.Incoming.wrap
1021
            ~data:
1022
              ( Transition_frontier.best_tip weaker_chain.state.frontier
1023
              |> Transition_frontier.Breadcrumb.validated_transition
1024
              |> Mina_block.Validated.to_initial_validated )
1025
            ~sender:
1026
              (Envelope.Sender.Remote
1027
                 (weaker_chain.peer.host, weaker_chain.peer.peer_id))
1028
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1029
          Envelope.Incoming.wrap
1030
            ~data:
1031
              ( Transition_frontier.best_tip stronger_chain.state.frontier
1032
              |> Transition_frontier.Breadcrumb.validated_transition
1033
              |> Mina_block.Validated.to_initial_validated )
1034
            ~sender:
1035
              (Envelope.Sender.Remote
1036
                 (stronger_chain.peer.host, stronger_chain.peer.peer_id))
1037
          |> Pipe_lib.Strict_pipe.Writer.write transition_writer ;
1038
          let new_frontier, sorted_external_transitions =
1039
            Async.Thread_safe.block_on_async_exn (fun () ->
1040
                run_bootstrap
1041
                  ~timeout_duration:(Block_time.Span.of_ms 60_000L)
1042
                  ~my_net:me ~transition_reader )
1043
          in
1044
          assert_transitions_increasingly_sorted
1045
            ~root:(Transition_frontier.root new_frontier)
1046
            sorted_external_transitions ;
1047
          [%test_result: Ledger_hash.t]
1048
            ( Ledger.Db.merkle_root
1049
            @@ Transition_frontier.root_snarked_ledger new_frontier )
1050
            ~expect:
1051
              ( Ledger.Db.merkle_root
1052
              @@ Transition_frontier.root_snarked_ledger
1053
                   stronger_chain.state.frontier ) )
1054
*)
1055
  end )
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc