• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2910

16 Nov 2024 09:13AM UTC coverage: 38.876% (+2.1%) from 36.73%
2910

Pull #16345

buildkite

dkijania
Merge branch 'compatible' into dkijania/merge/compatible_to_develop_16_11_24
Pull Request #16345: merge compatible to develop 16 11 24

15 of 40 new or added lines in 14 files covered. (37.5%)

9 existing lines in 5 files now uncovered.

26157 of 67283 relevant lines covered (38.88%)

20732.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.35
/src/lib/transition_handler/processor.ml
1
(** This module contains the transition processor. The transition processor is
2
 *  the thread in which transitions are attached the to the transition frontier.
3
 *
4
 *  Two types of data are handled by the transition processor: validated external transitions
5
 *  with precomputed state hashes (via the block producer and validator pipes)
6
 *  and breadcrumb rose trees (via the catchup pipe).
7
 *)
8

9
(* Only show stdout for failed inline tests. *)
2✔
10
open Inline_test_quiet_logs
11
open Core_kernel
12
open Async_kernel
13
open Pipe_lib.Strict_pipe
14
open Mina_base
15
open Mina_state
16
open Cache_lib
17
open Mina_block
18
open Network_peer
19

20
module type CONTEXT = sig
21
  val logger : Logger.t
22

23
  val precomputed_values : Precomputed_values.t
24

25
  val constraint_constants : Genesis_constants.Constraint_constants.t
26

27
  val consensus_constants : Consensus.Constants.t
28

29
  val compile_config : Mina_compile_config.t
30
end
31

32
(* TODO: calculate a sensible value from postake consensus arguments *)
33
let catchup_timeout_duration (precomputed_values : Precomputed_values.t) =
34
  Block_time.Span.of_ms
×
35
    ( (precomputed_values.genesis_constants.protocol.delta + 1)
36
      * precomputed_values.constraint_constants.block_window_duration_ms
37
    |> Int64.of_int )
×
38
  |> Block_time.Span.min (Block_time.Span.of_ms (Int64.of_int 5000))
×
39

40
let cached_transform_deferred_result ~transform_cached ~transform_result cached
41
    =
42
  Cached.transform cached ~f:transform_cached
×
43
  |> Cached.sequence_deferred
×
44
  >>= Fn.compose transform_result Cached.sequence_result
×
45

46
(* add a breadcrumb and perform post processing *)
47
let add_and_finalize ~logger ~frontier ~catchup_scheduler
48
    ~processed_transition_writer ~only_if_present ~time_controller ~source
49
    ~valid_cb cached_breadcrumb ~(precomputed_values : Precomputed_values.t) =
UNCOV
50
  let module Inclusion_time = Mina_metrics.Block_latency.Inclusion_time in
×
51
  let breadcrumb =
52
    if Cached.is_pure cached_breadcrumb then Cached.peek cached_breadcrumb
×
53
    else Cached.invalidate_with_success cached_breadcrumb
×
54
  in
55
  let consensus_constants = precomputed_values.consensus_constants in
56
  let transition =
57
    Transition_frontier.Breadcrumb.validated_transition breadcrumb
58
  in
59
  [%log debug] "add_and_finalize $state_hash %s callback"
×
60
    ~metadata:
61
      [ ( "state_hash"
62
        , Transition_frontier.Breadcrumb.state_hash breadcrumb
×
63
          |> State_hash.to_yojson )
×
64
      ]
65
    (Option.value_map valid_cb ~default:"without" ~f:(const "with")) ;
×
66
  let state_hash = Transition_frontier.Breadcrumb.state_hash breadcrumb in
×
67
  Internal_tracing.with_state_hash state_hash
×
68
  @@ fun () ->
69
  [%log internal] "Add_and_finalize" ;
×
70
  let%map () =
71
    if only_if_present then (
×
72
      let parent_hash = Transition_frontier.Breadcrumb.parent_hash breadcrumb in
73
      match Transition_frontier.find frontier parent_hash with
×
74
      | Some _ ->
×
75
          Transition_frontier.add_breadcrumb_exn frontier breadcrumb
×
76
      | None ->
×
77
          [%log internal] "Parent_breadcrumb_not_found" ;
×
78
          [%log warn]
×
79
            !"When trying to add breadcrumb, its parent had been removed from \
×
80
              transition frontier: %{sexp: State_hash.t}"
81
            parent_hash ;
82
          Deferred.unit )
×
83
    else Transition_frontier.add_breadcrumb_exn frontier breadcrumb
×
84
  in
85
  ( match source with
×
86
  | `Internal ->
×
87
      ()
88
  | _ ->
×
89
      let transition_time =
90
        transition |> Mina_block.Validated.header
×
91
        |> Mina_block.Header.protocol_state |> Protocol_state.consensus_state
×
92
        |> Consensus.Data.Consensus_state.consensus_time
93
      in
94
      let time_elapsed =
×
95
        Block_time.diff
96
          (Block_time.now time_controller)
×
97
          (Consensus.Data.Consensus_time.to_time ~constants:consensus_constants
×
98
             transition_time )
99
      in
100
      Inclusion_time.update (Block_time.Span.to_time_span time_elapsed) ) ;
×
101
  [%log internal] "Add_and_finalize_done" ;
×
102
  if Writer.is_closed processed_transition_writer then
×
103
    Or_error.error_string "processed transitions closed"
×
104
  else (
×
105
    Writer.write processed_transition_writer
106
      (`Transition transition, `Source source, `Valid_cb valid_cb) ;
107
    Catchup_scheduler.notify catchup_scheduler
×
108
      ~hash:(Mina_block.Validated.state_hash transition) )
×
109

110
let process_transition ~context:(module Context : CONTEXT) ~trust_system
111
    ~verifier ~get_completed_work ~frontier ~catchup_scheduler
112
    ~processed_transition_writer ~time_controller ~block_or_header ~valid_cb =
113
  let is_block_in_frontier =
3✔
114
    Fn.compose Option.is_some @@ Transition_frontier.find frontier
3✔
115
  in
116
  let open Context in
3✔
117
  let header, transition_hash, transition_receipt_time, sender, validation =
118
    match block_or_header with
119
    | `Block cached_env ->
3✔
120
        let env = Cached.peek cached_env in
121
        let block, v = Envelope.Incoming.data env in
3✔
122
        ( Mina_block.header @@ With_hash.data block
3✔
123
        , State_hash.With_state_hashes.state_hash block
3✔
124
        , Some (Envelope.Incoming.received_at env)
3✔
125
        , Envelope.Incoming.sender env
3✔
126
        , v )
127
    | `Header env ->
×
128
        let h, v = Envelope.Incoming.data env in
129
        ( With_hash.data h
×
130
        , State_hash.With_state_hashes.state_hash h
×
131
        , Some (Envelope.Incoming.received_at env)
×
132
        , Envelope.Incoming.sender env
×
133
        , v )
134
  in
135
  let parent_hash =
136
    Protocol_state.previous_state_hash (Header.protocol_state header)
3✔
137
  in
138
  let root_block =
3✔
139
    Transition_frontier.(Breadcrumb.block_with_hash @@ root frontier)
3✔
140
  in
141
  let metadata = [ ("state_hash", State_hash.to_yojson transition_hash) ] in
3✔
142
  let state_hash = transition_hash in
143
  Internal_tracing.with_state_hash state_hash
144
  @@ fun () ->
145
  [%log internal] "@block_metadata"
3✔
146
    ~metadata:
147
      [ ( "blockchain_length"
148
        , Mina_numbers.Length.to_yojson (Header.blockchain_length header) )
3✔
149
      ] ;
150
  [%log internal] "Begin_external_block_processing" ;
3✔
151
  let handle_not_selected () =
3✔
152
    [%log internal] "Failure"
×
153
      ~metadata:[ ("reason", `String "Not_selected_over_frontier_root") ] ;
154
    Trust_system.record_envelope_sender trust_system logger sender
×
155
      ( Trust_system.Actions.Gossiped_invalid_transition
156
      , Some
157
          ( "The transition with hash $state_hash was not selected over the \
158
             transition frontier root"
159
          , metadata ) )
160
  in
161
  match block_or_header with
162
  | `Header env -> (
×
163
      let header_with_hash, _ = Envelope.Incoming.data env in
164
      [%log internal] "Validate_frontier_dependencies" ;
×
165
      match
×
166
        Mina_block.Validation.validate_frontier_dependencies
167
          ~context:(module Context)
168
          ~root_block ~is_block_in_frontier ~to_header:ident
169
          (Envelope.Incoming.data env)
×
170
      with
171
      | Ok _ | Error `Parent_missing_from_frontier ->
×
172
          [%log internal] "Schedule_catchup" ;
×
173
          Catchup_scheduler.watch_header catchup_scheduler ~valid_cb
×
174
            ~header_with_hash ;
175
          return ()
×
176
      | Error `Not_selected_over_frontier_root ->
×
177
          handle_not_selected ()
178
      | Error `Already_in_frontier ->
×
179
          [%log internal] "Failure"
×
180
            ~metadata:[ ("reason", `String "Already_in_frontier") ] ;
181
          [%log warn] ~metadata
×
182
            "Refusing to process the transition with hash $state_hash because \
183
             is is already in the transition frontier" ;
184
          return () )
×
185
  | `Block cached_initially_validated_transition ->
3✔
186
      Deferred.ignore_m
187
      @@
188
      let open Deferred.Result.Let_syntax in
189
      let%bind mostly_validated_transition =
190
        let open Deferred.Let_syntax in
191
        let initially_validated_transition =
192
          Cached.peek cached_initially_validated_transition
3✔
193
          |> Envelope.Incoming.data
194
        in
195
        [%log internal] "Validate_frontier_dependencies" ;
3✔
196
        match
3✔
197
          Mina_block.Validation.validate_frontier_dependencies
198
            ~context:(module Context)
199
            ~root_block ~is_block_in_frontier ~to_header:Mina_block.header
200
            initially_validated_transition
201
        with
202
        | Ok t ->
×
203
            return (Ok t)
×
204
        | Error `Not_selected_over_frontier_root ->
×
205
            let (_ : Mina_block.initial_valid_block Envelope.Incoming.t) =
206
              Cached.invalidate_with_failure
207
                cached_initially_validated_transition
208
            in
209
            let%map () = handle_not_selected () in
×
210
            Error ()
×
211
        | Error `Already_in_frontier ->
3✔
212
            [%log internal] "Failure"
3✔
213
              ~metadata:[ ("reason", `String "Already_in_frontier") ] ;
214
            [%log warn] ~metadata
3✔
215
              "Refusing to process the transition with hash $state_hash \
216
               because is is already in the transition frontier" ;
217
            let (_ : Mina_block.initial_valid_block Envelope.Incoming.t) =
3✔
218
              Cached.invalidate_with_failure
219
                cached_initially_validated_transition
220
            in
221
            return (Error ())
3✔
222
        | Error `Parent_missing_from_frontier -> (
×
223
            [%log internal] "Schedule_catchup" ;
×
224
            match validation with
×
225
            | ( _
×
226
              , _
227
              , _
228
              , (`Delta_block_chain, Truth.True delta_state_hashes)
229
              , _
230
              , _
231
              , _ ) ->
232
                let timeout_duration =
233
                  Option.fold
234
                    (Transition_frontier.find frontier
×
235
                       (Mina_stdlib.Nonempty_list.head delta_state_hashes) )
×
236
                    ~init:(Block_time.Span.of_ms 0L)
×
237
                    ~f:(fun _ _ -> catchup_timeout_duration precomputed_values)
×
238
                in
239
                Catchup_scheduler.watch catchup_scheduler ~timeout_duration
×
240
                  ~cached_transition:cached_initially_validated_transition
241
                  ~valid_cb ;
242
                return (Error ()) )
×
243
      in
244
      (* TODO: only access parent in transition frontier once (already done in call to validate dependencies) #2485 *)
245
      [%log internal] "Find_parent_breadcrumb" ;
×
246
      let parent_breadcrumb =
×
247
        Transition_frontier.find_exn frontier parent_hash
248
      in
249
      let%bind breadcrumb =
250
        cached_transform_deferred_result cached_initially_validated_transition
×
251
          ~transform_cached:(fun _ ->
252
            Transition_frontier.Breadcrumb.build ~logger ~precomputed_values
×
253
              ~verifier ~get_completed_work ~trust_system
254
              ~transition_receipt_time ~sender:(Some sender)
255
              ~parent:parent_breadcrumb ~transition:mostly_validated_transition
256
              (* TODO: Can we skip here? *) () )
257
          ~transform_result:(function
258
            | Error (`Invalid_staged_ledger_hash error)
×
259
            | Error (`Invalid_staged_ledger_diff error) ->
×
260
                Internal_tracing.with_state_hash state_hash
261
                @@ fun () ->
262
                [%log internal] "Failure"
×
263
                  ~metadata:[ ("reason", `String (Error.to_string_hum error)) ] ;
×
264
                [%log error]
×
265
                  ~metadata:
266
                    (metadata @ [ ("error", Error_json.error_to_yojson error) ])
×
267
                  "Error while building breadcrumb in the transition handler \
268
                   processor: $error" ;
269
                Deferred.return (Error ())
×
270
            | Error (`Fatal_error exn) ->
×
271
                Internal_tracing.with_state_hash state_hash
272
                @@ fun () ->
273
                [%log internal] "Failure"
×
274
                  ~metadata:[ ("reason", `String "Fatal error") ] ;
275
                raise exn
×
276
            | Ok breadcrumb ->
×
277
                Deferred.return (Ok breadcrumb) )
278
      in
279
      Mina_metrics.(
×
280
        Counter.inc_one
×
281
          Transition_frontier_controller.breadcrumbs_built_by_processor) ;
282
      let%map.Deferred result =
283
        add_and_finalize ~logger ~frontier ~catchup_scheduler
×
284
          ~processed_transition_writer ~only_if_present:false ~time_controller
285
          ~source:`Gossip breadcrumb ~precomputed_values ~valid_cb
286
      in
UNCOV
287
      ( match result with
×
288
      | Ok () ->
×
289
          [%log internal] "Breadcrumb_integrated"
×
290
      | Error err ->
×
291
          [%log internal] "Failure"
×
292
            ~metadata:[ ("reason", `String (Error.to_string_hum err)) ] ) ;
×
293
      Result.return result
294

295
let run ~context:(module Context : CONTEXT) ~verifier ~trust_system
296
    ~time_controller ~frontier ~get_completed_work
297
    ~(primary_transition_reader :
298
       ( [ `Block of
299
           ( Mina_block.initial_valid_block Envelope.Incoming.t
300
           , State_hash.t )
301
           Cached.t
302
         | `Header of Mina_block.initial_valid_header Envelope.Incoming.t ]
303
       * [ `Valid_cb of Mina_net2.Validation_callback.t option ] )
304
       Reader.t )
305
    ~(producer_transition_reader : Transition_frontier.Breadcrumb.t Reader.t)
306
    ~(clean_up_catchup_scheduler : unit Ivar.t) ~catchup_job_writer
307
    ~(catchup_breadcrumbs_reader :
308
       ( ( (Transition_frontier.Breadcrumb.t, State_hash.t) Cached.t
309
         * Mina_net2.Validation_callback.t option )
310
         Rose_tree.t
311
         list
312
       * [ `Ledger_catchup of unit Ivar.t | `Catchup_scheduler ] )
313
       Reader.t )
314
    ~(catchup_breadcrumbs_writer :
315
       ( ( (Transition_frontier.Breadcrumb.t, State_hash.t) Cached.t
316
         * Mina_net2.Validation_callback.t option )
317
         Rose_tree.t
318
         list
319
         * [ `Ledger_catchup of unit Ivar.t | `Catchup_scheduler ]
320
       , crash buffered
321
       , unit )
322
       Writer.t ) ~processed_transition_writer =
323
  let open Context in
6✔
324
  let catchup_scheduler =
325
    Catchup_scheduler.create ~logger ~precomputed_values ~verifier ~trust_system
326
      ~frontier ~time_controller ~catchup_job_writer ~catchup_breadcrumbs_writer
327
      ~clean_up_signal:clean_up_catchup_scheduler
328
  in
329
  let add_and_finalize =
330
    add_and_finalize ~frontier ~catchup_scheduler ~processed_transition_writer
331
      ~time_controller ~precomputed_values
332
  in
333
  let process_transition =
334
    process_transition
335
      ~context:(module Context)
336
      ~get_completed_work ~trust_system ~verifier ~frontier ~catchup_scheduler
337
      ~processed_transition_writer ~time_controller
338
  in
339
  O1trace.background_thread "process_blocks" (fun () ->
340
      Reader.Merge.iter
6✔
341
        (* It is fine to skip the cache layer on blocks produced by this node
342
           * because it is extraordinarily unlikely we would write an internal bug
343
           * triggering this case, and the external case (where we received an
344
           * identical external transition from the network) can happen iff there
345
           * is another node with the exact same private key and view of the
346
           * transaction pool. *)
347
        [ Reader.map producer_transition_reader ~f:(fun breadcrumb ->
6✔
348
              Mina_metrics.(
×
349
                Gauge.inc_one
×
350
                  Transition_frontier_controller.transitions_being_processed) ;
351
              `Local_breadcrumb (Cached.pure breadcrumb) )
×
352
        ; Reader.map catchup_breadcrumbs_reader
6✔
353
            ~f:(fun (cb, catchup_breadcrumbs_callback) ->
354
              `Catchup_breadcrumbs (cb, catchup_breadcrumbs_callback) )
×
355
        ; Reader.map primary_transition_reader ~f:(fun vt ->
6✔
356
              `Partially_valid_transition vt )
3✔
357
        ]
358
        ~f:(fun msg ->
359
          let open Deferred.Let_syntax in
3✔
360
          O1trace.thread "transition_handler_processor" (fun () ->
361
              match msg with
3✔
362
              | `Catchup_breadcrumbs
×
363
                  (breadcrumb_subtrees, subsequent_callback_action) -> (
364
                  ( match%map
365
                      Deferred.Or_error.List.iter breadcrumb_subtrees
×
366
                        ~f:(fun subtree ->
367
                          Rose_tree.Deferred.Or_error.iter
×
368
                            subtree
369
                            (* It could be the case that by the time we try and
370
                               * add the breadcrumb, it's no longer relevant when
371
                               * we're catching up *) ~f:(fun (b, valid_cb) ->
372
                              let state_hash =
×
373
                                Frontier_base.Breadcrumb.state_hash
374
                                  (Cached.peek b)
×
375
                              in
376
                              let%map result =
377
                                add_and_finalize ~logger ~only_if_present:true
×
378
                                  ~source:`Catchup ~valid_cb b
379
                              in
UNCOV
380
                              Internal_tracing.with_state_hash state_hash
×
381
                              @@ fun () ->
382
                              ( match result with
×
383
                              | Error err ->
×
384
                                  [%log internal] "Failure"
×
385
                                    ~metadata:
386
                                      [ ( "reason"
387
                                        , `String (Error.to_string_hum err) )
×
388
                                      ]
389
                              | Ok () ->
×
390
                                  [%log internal] "Breadcrumb_integrated" ) ;
×
391
                              result ) )
392
                    with
393
                  | Ok () ->
×
394
                      ()
395
                  | Error err ->
×
396
                      List.iter breadcrumb_subtrees ~f:(fun tree ->
397
                          Rose_tree.iter tree
×
398
                            ~f:(fun (cached_breadcrumb, _vc) ->
399
                              let (_ : Transition_frontier.Breadcrumb.t) =
×
400
                                Cached.invalidate_with_failure cached_breadcrumb
401
                              in
402
                              () ) ) ;
×
403
                      [%log error]
×
404
                        "Error, failed to attach all catchup breadcrumbs to \
405
                         transition frontier: $error"
406
                        ~metadata:[ ("error", Error_json.error_to_yojson err) ]
×
407
                  )
408
                  >>| fun () ->
409
                  match subsequent_callback_action with
×
410
                  | `Ledger_catchup decrement_signal ->
×
411
                      if Ivar.is_full decrement_signal then
412
                        [%log error] "Ivar.fill bug is here!" ;
×
413
                      Ivar.fill decrement_signal ()
×
414
                  | `Catchup_scheduler ->
×
415
                      () )
416
              | `Local_breadcrumb breadcrumb ->
×
417
                  let state_hash =
418
                    Transition_frontier.Breadcrumb.validated_transition
×
419
                      (Cached.peek breadcrumb)
×
420
                    |> Mina_block.Validated.state_hash
421
                  in
422
                  Internal_tracing.with_state_hash state_hash
×
423
                  @@ fun () ->
424
                  [%log internal] "Begin_local_block_processing" ;
×
425
                  let transition_time =
×
426
                    Transition_frontier.Breadcrumb.validated_transition
×
427
                      (Cached.peek breadcrumb)
×
428
                    |> Mina_block.Validated.header
×
429
                    |> Mina_block.Header.protocol_state
×
430
                    |> Protocol_state.blockchain_state
×
431
                    |> Blockchain_state.timestamp |> Block_time.to_time_exn
×
432
                  in
433
                  Perf_histograms.add_span
×
434
                    ~name:"accepted_transition_local_latency"
435
                    (Core_kernel.Time.diff
×
436
                       Block_time.(now time_controller |> to_time_exn)
×
437
                       transition_time ) ;
438
                  let%map () =
439
                    match%map
440
                      add_and_finalize ~logger ~only_if_present:false
×
441
                        ~source:`Internal breadcrumb ~valid_cb:None
442
                    with
UNCOV
443
                    | Ok () ->
×
444
                        [%log internal] "Breadcrumb_integrated" ;
×
445
                        ()
×
446
                    | Error err ->
×
447
                        [%log internal] "Failure"
×
448
                          ~metadata:
449
                            [ ("reason", `String (Error.to_string_hum err)) ] ;
×
450
                        [%log error]
×
451
                          ~metadata:
452
                            [ ("error", Error_json.error_to_yojson err) ]
×
453
                          "Error, failed to attach produced breadcrumb to \
454
                           transition frontier: $error" ;
455
                        let (_ : Transition_frontier.Breadcrumb.t) =
×
456
                          Cached.invalidate_with_failure breadcrumb
457
                        in
458
                        ()
×
459
                  in
460
                  Mina_metrics.(
×
461
                    Gauge.dec_one
462
                      Transition_frontier_controller.transitions_being_processed)
463
              | `Partially_valid_transition (block_or_header, `Valid_cb valid_cb)
3✔
464
                ->
465
                  process_transition ~block_or_header ~valid_cb ) ) )
466

467
let%test_module "Transition_handler.Processor tests" =
468
  ( module struct
469
    open Async
470
    open Pipe_lib
471

472
    let () =
473
      Backtrace.elide := false ;
474
      Printexc.record_backtrace true ;
475
      Async.Scheduler.set_record_backtraces true
×
476

477
    let logger = Logger.create ()
×
478

479
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
480

481
    let proof_level = precomputed_values.proof_level
482

483
    let constraint_constants = precomputed_values.constraint_constants
484

485
    let time_controller = Block_time.Controller.basic ~logger
486

487
    let trust_system = Trust_system.null ()
×
488

489
    let compile_config = Mina_compile_config.For_unit_tests.t
490

491
    let verifier =
492
      Async.Thread_safe.block_on_async_exn (fun () ->
×
493
          Verifier.For_tests.default ~constraint_constants ~logger ~proof_level
×
494
            () )
495

496
    module Context = struct
497
      let logger = logger
498

499
      let precomputed_values = precomputed_values
500

501
      let constraint_constants = constraint_constants
502

503
      let consensus_constants = precomputed_values.consensus_constants
504

505
      let compile_config = compile_config
506
    end
507

508
    let downcast_breadcrumb breadcrumb =
509
      let transition =
×
510
        Transition_frontier.Breadcrumb.validated_transition breadcrumb
×
511
        |> Mina_block.Validated.remember
×
512
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
513
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
514
      in
515
      Envelope.Incoming.wrap ~data:transition ~sender:Envelope.Sender.Local
×
516

517
    let%test_unit "adding transitions whose parents are in the frontier" =
518
      let frontier_size = 1 in
×
519
      let branch_size = 10 in
520
      let max_length = frontier_size + branch_size in
521
      Quickcheck.test ~trials:4
522
        (Transition_frontier.For_tests.gen_with_branch ~precomputed_values
×
523
           ~verifier ~max_length ~frontier_size ~branch_size () )
524
        ~f:(fun (frontier, branch) ->
525
          assert (
×
526
            Thread_safe.block_on_async_exn (fun () ->
×
527
                let valid_transition_reader, valid_transition_writer =
×
528
                  Strict_pipe.create
529
                    (Buffered
530
                       (`Capacity branch_size, `Overflow (Drop_head ignore)) )
531
                in
532
                let producer_transition_reader, _ =
×
533
                  Strict_pipe.create
534
                    (Buffered
535
                       (`Capacity branch_size, `Overflow (Drop_head ignore)) )
536
                in
537
                let _, catchup_job_writer =
×
538
                  Strict_pipe.create (Buffered (`Capacity 1, `Overflow Crash))
539
                in
540
                let catchup_breadcrumbs_reader, catchup_breadcrumbs_writer =
×
541
                  Strict_pipe.create (Buffered (`Capacity 1, `Overflow Crash))
542
                in
543
                let processed_transition_reader, processed_transition_writer =
×
544
                  Strict_pipe.create
545
                    (Buffered
546
                       (`Capacity branch_size, `Overflow (Drop_head ignore)) )
547
                in
548
                let clean_up_catchup_scheduler = Ivar.create () in
×
549
                let cache =
×
550
                  Unprocessed_transition_cache.create ~logger
551
                    ~cache_exceptions:true
552
                in
553
                run
554
                  ~context:(module Context)
555
                  ~time_controller ~verifier ~get_completed_work:(Fn.const None)
×
556
                  ~trust_system ~clean_up_catchup_scheduler ~frontier
557
                  ~primary_transition_reader:valid_transition_reader
558
                  ~producer_transition_reader ~catchup_job_writer
559
                  ~catchup_breadcrumbs_reader ~catchup_breadcrumbs_writer
560
                  ~processed_transition_writer ;
561
                List.iter branch ~f:(fun breadcrumb ->
562
                    let b =
×
563
                      downcast_breadcrumb breadcrumb
×
564
                      |> Unprocessed_transition_cache.register_exn cache
565
                    in
566
                    Strict_pipe.Writer.write valid_transition_writer
×
567
                      (`Block b, `Valid_cb None) ) ;
568
                match%map
569
                  Block_time.Timeout.await
×
570
                    ~timeout_duration:(Block_time.Span.of_ms 30000L)
×
571
                    time_controller
572
                    (Strict_pipe.Reader.fold_until processed_transition_reader
×
573
                       ~init:branch
574
                       ~f:(fun
575
                            remaining_breadcrumbs
576
                            (`Transition newly_added_transition, _, _)
577
                          ->
578
                         Deferred.return
×
579
                           ( match remaining_breadcrumbs with
580
                           | next_expected_breadcrumb :: tail ->
×
581
                               [%test_eq: State_hash.t]
×
582
                                 (Transition_frontier.Breadcrumb.state_hash
×
583
                                    next_expected_breadcrumb )
584
                                 (Mina_block.Validated.state_hash
×
585
                                    newly_added_transition ) ;
586
                               [%log info]
×
587
                                 ~metadata:
588
                                   [ ( "height"
589
                                     , `Int
590
                                         ( newly_added_transition
591
                                         |> Mina_block.Validated.forget
×
592
                                         |> With_hash.data |> Mina_block.header
×
593
                                         |> Mina_block.Header.protocol_state
×
594
                                         |> Protocol_state.consensus_state
×
595
                                         |> Consensus.Data.Consensus_state
596
                                            .blockchain_length
×
597
                                         |> Mina_numbers.Length.to_uint32
×
598
                                         |> Unsigned.UInt32.to_int ) )
×
599
                                   ]
600
                                 "transition of $height passed processor" ;
601
                               if List.is_empty tail then `Stop true
×
602
                               else `Continue tail
×
603
                           | [] ->
×
604
                               `Stop false ) ) )
605
                with
606
                | `Timeout ->
×
607
                    failwith "test timed out"
608
                | `Ok (`Eof _) ->
×
609
                    failwith "pipe closed unexpectedly"
610
                | `Ok (`Terminated x) ->
×
611
                    x ) ) )
612
  end )
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc