• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2837

30 Oct 2024 07:56AM UTC coverage: 38.267% (-22.8%) from 61.098%
2837

push

buildkite

web-flow
Merge pull request #16306 from MinaProtocol/dkijania/fix_promotion_to_gcr

Fix promotion job PUBLISH misuse

7417 of 19382 relevant lines covered (38.27%)

298565.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.9
/src/lib/transition_handler/processor.ml
1
(** This module contains the transition processor. The transition processor is
2
 *  the thread in which transitions are attached the to the transition frontier.
3
 *
4
 *  Two types of data are handled by the transition processor: validated external transitions
5
 *  with precomputed state hashes (via the block producer and validator pipes)
6
 *  and breadcrumb rose trees (via the catchup pipe).
7
 *)
8

9
(* Only show stdout for failed inline tests. *)
1✔
10
open Inline_test_quiet_logs
11
open Core_kernel
12
open Async_kernel
13
open Pipe_lib.Strict_pipe
14
open Mina_base
15
open Mina_state
16
open Cache_lib
17
open Mina_block
18
open Network_peer
19

20
module type CONTEXT = sig
21
  val logger : Logger.t
22

23
  val precomputed_values : Precomputed_values.t
24

25
  val constraint_constants : Genesis_constants.Constraint_constants.t
26

27
  val consensus_constants : Consensus.Constants.t
28

29
  val compile_config : Mina_compile_config.t
30
end
31

32
(* TODO: calculate a sensible value from postake consensus arguments *)
33
let catchup_timeout_duration (precomputed_values : Precomputed_values.t) =
34
  Block_time.Span.of_ms
×
35
    ( (precomputed_values.genesis_constants.protocol.delta + 1)
36
      * precomputed_values.constraint_constants.block_window_duration_ms
37
    |> Int64.of_int )
×
38
  |> Block_time.Span.min (Block_time.Span.of_ms (Int64.of_int 5000))
×
39

40
let cached_transform_deferred_result ~transform_cached ~transform_result cached
41
    =
42
  Cached.transform cached ~f:transform_cached
×
43
  |> Cached.sequence_deferred
×
44
  >>= Fn.compose transform_result Cached.sequence_result
×
45

46
(* add a breadcrumb and perform post processing *)
47
let add_and_finalize ~logger ~frontier ~catchup_scheduler
48
    ~processed_transition_writer ~only_if_present ~time_controller ~source
49
    ~valid_cb cached_breadcrumb ~(precomputed_values : Precomputed_values.t)
50
    ~block_window_duration:_ (*TODO remove unused var*) =
51
  let module Inclusion_time = Mina_metrics.Block_latency.Inclusion_time in
×
52
  let breadcrumb =
53
    if Cached.is_pure cached_breadcrumb then Cached.peek cached_breadcrumb
×
54
    else Cached.invalidate_with_success cached_breadcrumb
×
55
  in
56
  let consensus_constants = precomputed_values.consensus_constants in
57
  let transition =
58
    Transition_frontier.Breadcrumb.validated_transition breadcrumb
59
  in
60
  [%log debug] "add_and_finalize $state_hash %s callback"
×
61
    ~metadata:
62
      [ ( "state_hash"
63
        , Transition_frontier.Breadcrumb.state_hash breadcrumb
×
64
          |> State_hash.to_yojson )
×
65
      ]
66
    (Option.value_map valid_cb ~default:"without" ~f:(const "with")) ;
×
67
  let state_hash = Transition_frontier.Breadcrumb.state_hash breadcrumb in
×
68
  Internal_tracing.with_state_hash state_hash
×
69
  @@ fun () ->
70
  [%log internal] "Add_and_finalize" ;
×
71
  let%map () =
72
    if only_if_present then (
×
73
      let parent_hash = Transition_frontier.Breadcrumb.parent_hash breadcrumb in
74
      match Transition_frontier.find frontier parent_hash with
×
75
      | Some _ ->
×
76
          Transition_frontier.add_breadcrumb_exn frontier breadcrumb
×
77
      | None ->
×
78
          [%log internal] "Parent_breadcrumb_not_found" ;
×
79
          [%log warn]
×
80
            !"When trying to add breadcrumb, its parent had been removed from \
×
81
              transition frontier: %{sexp: State_hash.t}"
82
            parent_hash ;
83
          Deferred.unit )
×
84
    else Transition_frontier.add_breadcrumb_exn frontier breadcrumb
×
85
  in
86
  ( match source with
×
87
  | `Internal ->
×
88
      ()
89
  | _ ->
×
90
      let transition_time =
91
        transition |> Mina_block.Validated.header
×
92
        |> Mina_block.Header.protocol_state |> Protocol_state.consensus_state
×
93
        |> Consensus.Data.Consensus_state.consensus_time
94
      in
95
      let time_elapsed =
×
96
        Block_time.diff
97
          (Block_time.now time_controller)
×
98
          (Consensus.Data.Consensus_time.to_time ~constants:consensus_constants
×
99
             transition_time )
100
      in
101
      Inclusion_time.update (Block_time.Span.to_time_span time_elapsed) ) ;
×
102
  [%log internal] "Add_and_finalize_done" ;
×
103
  if Writer.is_closed processed_transition_writer then
×
104
    Or_error.error_string "processed transitions closed"
×
105
  else (
×
106
    Writer.write processed_transition_writer
107
      (`Transition transition, `Source source, `Valid_cb valid_cb) ;
108
    Catchup_scheduler.notify catchup_scheduler
×
109
      ~hash:(Mina_block.Validated.state_hash transition) )
×
110

111
let process_transition ~context:(module Context : CONTEXT) ~trust_system
112
    ~verifier ~get_completed_work ~frontier ~catchup_scheduler
113
    ~processed_transition_writer ~time_controller
114
    ~transition:cached_initially_validated_transition ~valid_cb =
115
  let open Context in
×
116
  let enveloped_initially_validated_transition =
117
    Cached.peek cached_initially_validated_transition
118
  in
119
  let transition_receipt_time =
×
120
    Some
121
      (Envelope.Incoming.received_at enveloped_initially_validated_transition)
×
122
  in
123
  let sender =
124
    Envelope.Incoming.sender enveloped_initially_validated_transition
125
  in
126
  let initially_validated_transition =
×
127
    Envelope.Incoming.data enveloped_initially_validated_transition
128
  in
129
  let transition_hash, transition =
×
130
    let t, _ = initially_validated_transition in
131
    (State_hash.With_state_hashes.state_hash t, With_hash.data t)
×
132
  in
133
  let metadata = [ ("state_hash", State_hash.to_yojson transition_hash) ] in
×
134
  let state_hash = transition_hash in
135
  Internal_tracing.with_state_hash state_hash
136
  @@ fun () ->
137
  [%log internal] "@block_metadata"
×
138
    ~metadata:
139
      [ ( "blockchain_length"
140
        , Mina_numbers.Length.to_yojson
×
141
            (Mina_block.blockchain_length transition) )
×
142
      ] ;
143
  [%log internal] "Begin_external_block_processing" ;
×
144
  Deferred.map ~f:(Fn.const ())
×
145
    (let open Deferred.Result.Let_syntax in
146
    [%log internal] "Validate_frontier_dependencies" ;
×
147
    let%bind mostly_validated_transition =
148
      let open Deferred.Let_syntax in
149
      match
150
        Mina_block.Validation.validate_frontier_dependencies
151
          ~context:(module Context)
152
          ~root_block:
153
            Transition_frontier.(Breadcrumb.block_with_hash @@ root frontier)
×
154
          ~get_block_by_hash:
155
            Transition_frontier.(
156
              Fn.compose (Option.map ~f:Breadcrumb.block_with_hash)
×
157
              @@ find frontier)
×
158
          initially_validated_transition
159
      with
160
      | Ok t ->
×
161
          return (Ok t)
×
162
      | Error `Not_selected_over_frontier_root ->
×
163
          [%log internal] "Failure"
×
164
            ~metadata:[ ("reason", `String "Not_selected_over_frontier_root") ] ;
165
          let%map () =
166
            Trust_system.record_envelope_sender trust_system logger sender
×
167
              ( Trust_system.Actions.Gossiped_invalid_transition
168
              , Some
169
                  ( "The transition with hash $state_hash was not selected \
170
                     over the transition frontier root"
171
                  , metadata ) )
172
          in
173
          let (_ : Mina_block.initial_valid_block Envelope.Incoming.t) =
×
174
            Cached.invalidate_with_failure cached_initially_validated_transition
175
          in
176
          Error ()
×
177
      | Error `Already_in_frontier ->
×
178
          [%log internal] "Failure"
×
179
            ~metadata:[ ("reason", `String "Already_in_frontier") ] ;
180
          [%log warn] ~metadata
×
181
            "Refusing to process the transition with hash $state_hash because \
182
             is is already in the transition frontier" ;
183
          let (_ : Mina_block.initial_valid_block Envelope.Incoming.t) =
×
184
            Cached.invalidate_with_failure cached_initially_validated_transition
185
          in
186
          return (Error ())
×
187
      | Error `Parent_missing_from_frontier -> (
×
188
          [%log internal] "Schedule_catchup" ;
×
189
          let _, validation =
×
190
            Cached.peek cached_initially_validated_transition
×
191
            |> Envelope.Incoming.data
192
          in
193
          match validation with
×
194
          | ( _
×
195
            , _
196
            , _
197
            , (`Delta_block_chain, Truth.True delta_state_hashes)
198
            , _
199
            , _
200
            , _ ) ->
201
              let timeout_duration =
202
                Option.fold
203
                  (Transition_frontier.find frontier
×
204
                     (Mina_stdlib.Nonempty_list.head delta_state_hashes) )
×
205
                  ~init:(Block_time.Span.of_ms 0L)
×
206
                  ~f:(fun _ _ -> catchup_timeout_duration precomputed_values)
×
207
              in
208
              Catchup_scheduler.watch catchup_scheduler ~timeout_duration
×
209
                ~cached_transition:cached_initially_validated_transition
210
                ~valid_cb
211
                ~block_window_duration:compile_config.block_window_duration ;
212
              return (Error ()) )
×
213
    in
214
    (* TODO: only access parent in transition frontier once (already done in call to validate dependencies) #2485 *)
215
    let parent_hash =
×
216
      Protocol_state.previous_state_hash
217
        (Header.protocol_state @@ Mina_block.header transition)
×
218
    in
219
    [%log internal] "Find_parent_breadcrumb" ;
×
220
    let parent_breadcrumb = Transition_frontier.find_exn frontier parent_hash in
×
221
    let%bind breadcrumb =
222
      cached_transform_deferred_result cached_initially_validated_transition
×
223
        ~transform_cached:(fun _ ->
224
          Transition_frontier.Breadcrumb.build ~logger ~precomputed_values
×
225
            ~verifier ~get_completed_work ~trust_system ~transition_receipt_time
226
            ~sender:(Some sender) ~parent:parent_breadcrumb
227
            ~transition:mostly_validated_transition
228
            (* TODO: Can we skip here? *) () )
229
        ~transform_result:(function
230
          | Error (`Invalid_staged_ledger_hash error)
×
231
          | Error (`Invalid_staged_ledger_diff error) ->
×
232
              Internal_tracing.with_state_hash state_hash
233
              @@ fun () ->
234
              [%log internal] "Failure"
×
235
                ~metadata:[ ("reason", `String (Error.to_string_hum error)) ] ;
×
236
              [%log error]
×
237
                ~metadata:
238
                  (metadata @ [ ("error", Error_json.error_to_yojson error) ])
×
239
                "Error while building breadcrumb in the transition handler \
240
                 processor: $error" ;
241
              Deferred.return (Error ())
×
242
          | Error (`Fatal_error exn) ->
×
243
              Internal_tracing.with_state_hash state_hash
244
              @@ fun () ->
245
              [%log internal] "Failure"
×
246
                ~metadata:[ ("reason", `String "Fatal error") ] ;
247
              raise exn
×
248
          | Ok breadcrumb ->
×
249
              Deferred.return (Ok breadcrumb) )
250
    in
251
    Mina_metrics.(
×
252
      Counter.inc_one
×
253
        Transition_frontier_controller.breadcrumbs_built_by_processor) ;
254
    let%map.Deferred result =
255
      add_and_finalize ~logger ~frontier ~catchup_scheduler
×
256
        ~processed_transition_writer ~only_if_present:false ~time_controller
257
        ~source:`Gossip breadcrumb ~precomputed_values ~valid_cb
258
        ~block_window_duration:compile_config.block_window_duration
259
    in
260
    ( match result with
×
261
    | Ok () ->
×
262
        [%log internal] "Breadcrumb_integrated"
×
263
    | Error err ->
×
264
        [%log internal] "Failure"
×
265
          ~metadata:[ ("reason", `String (Error.to_string_hum err)) ] ) ;
×
266
    Result.return result)
267

268
let run ~context:(module Context : CONTEXT) ~verifier ~trust_system
269
    ~time_controller ~frontier ~get_completed_work
270
    ~(primary_transition_reader :
271
       ( [ `Block of
272
           ( Mina_block.initial_valid_block Envelope.Incoming.t
273
           , State_hash.t )
274
           Cached.t ]
275
       * [ `Valid_cb of Mina_net2.Validation_callback.t option ] )
276
       Reader.t )
277
    ~(producer_transition_reader : Transition_frontier.Breadcrumb.t Reader.t)
278
    ~(clean_up_catchup_scheduler : unit Ivar.t) ~catchup_job_writer
279
    ~(catchup_breadcrumbs_reader :
280
       ( ( (Transition_frontier.Breadcrumb.t, State_hash.t) Cached.t
281
         * Mina_net2.Validation_callback.t option )
282
         Rose_tree.t
283
         list
284
       * [ `Ledger_catchup of unit Ivar.t | `Catchup_scheduler ] )
285
       Reader.t )
286
    ~(catchup_breadcrumbs_writer :
287
       ( ( (Transition_frontier.Breadcrumb.t, State_hash.t) Cached.t
288
         * Mina_net2.Validation_callback.t option )
289
         Rose_tree.t
290
         list
291
         * [ `Ledger_catchup of unit Ivar.t | `Catchup_scheduler ]
292
       , crash buffered
293
       , unit )
294
       Writer.t ) ~processed_transition_writer =
295
  let open Context in
×
296
  let catchup_scheduler =
297
    Catchup_scheduler.create ~logger ~precomputed_values ~verifier ~trust_system
298
      ~frontier ~time_controller ~catchup_job_writer ~catchup_breadcrumbs_writer
299
      ~clean_up_signal:clean_up_catchup_scheduler
300
  in
301
  let add_and_finalize =
302
    add_and_finalize ~frontier ~catchup_scheduler ~processed_transition_writer
303
      ~time_controller ~precomputed_values
304
  in
305
  let process_transition =
306
    process_transition
307
      ~context:(module Context)
308
      ~get_completed_work ~trust_system ~verifier ~frontier ~catchup_scheduler
309
      ~processed_transition_writer ~time_controller
310
  in
311
  O1trace.background_thread "process_blocks" (fun () ->
312
      Reader.Merge.iter
×
313
        (* It is fine to skip the cache layer on blocks produced by this node
314
           * because it is extraordinarily unlikely we would write an internal bug
315
           * triggering this case, and the external case (where we received an
316
           * identical external transition from the network) can happen iff there
317
           * is another node with the exact same private key and view of the
318
           * transaction pool. *)
319
        [ Reader.map producer_transition_reader ~f:(fun breadcrumb ->
×
320
              Mina_metrics.(
×
321
                Gauge.inc_one
×
322
                  Transition_frontier_controller.transitions_being_processed) ;
323
              `Local_breadcrumb (Cached.pure breadcrumb) )
×
324
        ; Reader.map catchup_breadcrumbs_reader
×
325
            ~f:(fun (cb, catchup_breadcrumbs_callback) ->
326
              `Catchup_breadcrumbs (cb, catchup_breadcrumbs_callback) )
×
327
        ; Reader.map primary_transition_reader ~f:(fun vt ->
×
328
              `Partially_valid_transition vt )
×
329
        ]
330
        ~f:(fun msg ->
331
          let open Deferred.Let_syntax in
×
332
          O1trace.thread "transition_handler_processor" (fun () ->
333
              match msg with
×
334
              | `Catchup_breadcrumbs
×
335
                  (breadcrumb_subtrees, subsequent_callback_action) -> (
336
                  ( match%map
337
                      Deferred.Or_error.List.iter breadcrumb_subtrees
×
338
                        ~f:(fun subtree ->
339
                          Rose_tree.Deferred.Or_error.iter
×
340
                            subtree
341
                            (* It could be the case that by the time we try and
342
                               * add the breadcrumb, it's no longer relevant when
343
                               * we're catching up *) ~f:(fun (b, valid_cb) ->
344
                              let state_hash =
×
345
                                Frontier_base.Breadcrumb.state_hash
346
                                  (Cached.peek b)
×
347
                              in
348
                              let%map result =
349
                                add_and_finalize ~logger ~only_if_present:true
×
350
                                  ~source:`Catchup ~valid_cb b
351
                                  ~block_window_duration:
352
                                    compile_config.block_window_duration
353
                              in
354
                              Internal_tracing.with_state_hash state_hash
×
355
                              @@ fun () ->
356
                              ( match result with
×
357
                              | Error err ->
×
358
                                  [%log internal] "Failure"
×
359
                                    ~metadata:
360
                                      [ ( "reason"
361
                                        , `String (Error.to_string_hum err) )
×
362
                                      ]
363
                              | Ok () ->
×
364
                                  [%log internal] "Breadcrumb_integrated" ) ;
×
365
                              result ) )
366
                    with
367
                  | Ok () ->
×
368
                      ()
369
                  | Error err ->
×
370
                      List.iter breadcrumb_subtrees ~f:(fun tree ->
371
                          Rose_tree.iter tree
×
372
                            ~f:(fun (cached_breadcrumb, _vc) ->
373
                              let (_ : Transition_frontier.Breadcrumb.t) =
×
374
                                Cached.invalidate_with_failure cached_breadcrumb
375
                              in
376
                              () ) ) ;
×
377
                      [%log error]
×
378
                        "Error, failed to attach all catchup breadcrumbs to \
379
                         transition frontier: $error"
380
                        ~metadata:[ ("error", Error_json.error_to_yojson err) ]
×
381
                  )
382
                  >>| fun () ->
383
                  match subsequent_callback_action with
×
384
                  | `Ledger_catchup decrement_signal ->
×
385
                      if Ivar.is_full decrement_signal then
386
                        [%log error] "Ivar.fill bug is here!" ;
×
387
                      Ivar.fill decrement_signal ()
×
388
                  | `Catchup_scheduler ->
×
389
                      () )
390
              | `Local_breadcrumb breadcrumb ->
×
391
                  let state_hash =
392
                    Transition_frontier.Breadcrumb.validated_transition
×
393
                      (Cached.peek breadcrumb)
×
394
                    |> Mina_block.Validated.state_hash
395
                  in
396
                  Internal_tracing.with_state_hash state_hash
×
397
                  @@ fun () ->
398
                  [%log internal] "Begin_local_block_processing" ;
×
399
                  let transition_time =
×
400
                    Transition_frontier.Breadcrumb.validated_transition
×
401
                      (Cached.peek breadcrumb)
×
402
                    |> Mina_block.Validated.header
×
403
                    |> Mina_block.Header.protocol_state
×
404
                    |> Protocol_state.blockchain_state
×
405
                    |> Blockchain_state.timestamp |> Block_time.to_time_exn
×
406
                  in
407
                  Perf_histograms.add_span
×
408
                    ~name:"accepted_transition_local_latency"
409
                    (Core_kernel.Time.diff
×
410
                       Block_time.(now time_controller |> to_time_exn)
×
411
                       transition_time ) ;
412
                  let%map () =
413
                    match%map
414
                      add_and_finalize ~logger ~only_if_present:false
×
415
                        ~source:`Internal breadcrumb ~valid_cb:None
416
                        ~block_window_duration:
417
                          compile_config.block_window_duration
418
                    with
419
                    | Ok () ->
×
420
                        [%log internal] "Breadcrumb_integrated" ;
×
421
                        ()
×
422
                    | Error err ->
×
423
                        [%log internal] "Failure"
×
424
                          ~metadata:
425
                            [ ("reason", `String (Error.to_string_hum err)) ] ;
×
426
                        [%log error]
×
427
                          ~metadata:
428
                            [ ("error", Error_json.error_to_yojson err) ]
×
429
                          "Error, failed to attach produced breadcrumb to \
430
                           transition frontier: $error" ;
431
                        let (_ : Transition_frontier.Breadcrumb.t) =
×
432
                          Cached.invalidate_with_failure breadcrumb
433
                        in
434
                        ()
×
435
                  in
436
                  Mina_metrics.(
×
437
                    Gauge.dec_one
438
                      Transition_frontier_controller.transitions_being_processed)
439
              | `Partially_valid_transition
×
440
                  (`Block transition, `Valid_cb valid_cb) ->
441
                  process_transition ~transition ~valid_cb ) ) )
442

443
let%test_module "Transition_handler.Processor tests" =
444
  ( module struct
445
    open Async
446
    open Pipe_lib
447

448
    let () =
449
      Backtrace.elide := false ;
450
      Printexc.record_backtrace true ;
451
      Async.Scheduler.set_record_backtraces true
×
452

453
    let logger = Logger.create ()
×
454

455
    let precomputed_values = Lazy.force Precomputed_values.for_unit_tests
×
456

457
    let proof_level = precomputed_values.proof_level
458

459
    let constraint_constants = precomputed_values.constraint_constants
460

461
    let time_controller = Block_time.Controller.basic ~logger
462

463
    let trust_system = Trust_system.null ()
×
464

465
    let compile_config = Mina_compile_config.For_unit_tests.t
466

467
    let verifier =
468
      Async.Thread_safe.block_on_async_exn (fun () ->
×
469
          Verifier.create ~logger ~proof_level ~constraint_constants
×
470
            ~conf_dir:None
471
            ~pids:(Child_processes.Termination.create_pid_table ())
×
472
            ~commit_id:"not specified for unit tests" () )
473

474
    module Context = struct
475
      let logger = logger
476

477
      let precomputed_values = precomputed_values
478

479
      let constraint_constants = constraint_constants
480

481
      let consensus_constants = precomputed_values.consensus_constants
482

483
      let compile_config = compile_config
484
    end
485

486
    let downcast_breadcrumb breadcrumb =
487
      let transition =
×
488
        Transition_frontier.Breadcrumb.validated_transition breadcrumb
×
489
        |> Mina_block.Validated.remember
×
490
        |> Mina_block.Validation.reset_frontier_dependencies_validation
×
491
        |> Mina_block.Validation.reset_staged_ledger_diff_validation
492
      in
493
      Envelope.Incoming.wrap ~data:transition ~sender:Envelope.Sender.Local
×
494

495
    let%test_unit "adding transitions whose parents are in the frontier" =
496
      let frontier_size = 1 in
×
497
      let branch_size = 10 in
498
      let max_length = frontier_size + branch_size in
499
      Quickcheck.test ~trials:4
500
        (Transition_frontier.For_tests.gen_with_branch ~precomputed_values
×
501
           ~verifier ~max_length ~frontier_size ~branch_size () )
502
        ~f:(fun (frontier, branch) ->
503
          assert (
×
504
            Thread_safe.block_on_async_exn (fun () ->
×
505
                let valid_transition_reader, valid_transition_writer =
×
506
                  Strict_pipe.create
507
                    (Buffered
508
                       (`Capacity branch_size, `Overflow (Drop_head ignore)) )
509
                in
510
                let producer_transition_reader, _ =
×
511
                  Strict_pipe.create
512
                    (Buffered
513
                       (`Capacity branch_size, `Overflow (Drop_head ignore)) )
514
                in
515
                let _, catchup_job_writer =
×
516
                  Strict_pipe.create (Buffered (`Capacity 1, `Overflow Crash))
517
                in
518
                let catchup_breadcrumbs_reader, catchup_breadcrumbs_writer =
×
519
                  Strict_pipe.create (Buffered (`Capacity 1, `Overflow Crash))
520
                in
521
                let processed_transition_reader, processed_transition_writer =
×
522
                  Strict_pipe.create
523
                    (Buffered
524
                       (`Capacity branch_size, `Overflow (Drop_head ignore)) )
525
                in
526
                let clean_up_catchup_scheduler = Ivar.create () in
×
527
                let cache =
×
528
                  Unprocessed_transition_cache.create ~logger
529
                    ~cache_exceptions:true
530
                in
531
                run
532
                  ~context:(module Context)
533
                  ~time_controller ~verifier ~get_completed_work:(Fn.const None)
×
534
                  ~trust_system ~clean_up_catchup_scheduler ~frontier
535
                  ~primary_transition_reader:valid_transition_reader
536
                  ~producer_transition_reader ~catchup_job_writer
537
                  ~catchup_breadcrumbs_reader ~catchup_breadcrumbs_writer
538
                  ~processed_transition_writer ;
539
                List.iter branch ~f:(fun breadcrumb ->
540
                    let b =
×
541
                      downcast_breadcrumb breadcrumb
×
542
                      |> Unprocessed_transition_cache.register_exn cache
543
                    in
544
                    Strict_pipe.Writer.write valid_transition_writer
×
545
                      (`Block b, `Valid_cb None) ) ;
546
                match%map
547
                  Block_time.Timeout.await
×
548
                    ~timeout_duration:(Block_time.Span.of_ms 30000L)
×
549
                    time_controller
550
                    (Strict_pipe.Reader.fold_until processed_transition_reader
×
551
                       ~init:branch
552
                       ~f:(fun
553
                            remaining_breadcrumbs
554
                            (`Transition newly_added_transition, _, _)
555
                          ->
556
                         Deferred.return
×
557
                           ( match remaining_breadcrumbs with
558
                           | next_expected_breadcrumb :: tail ->
×
559
                               [%test_eq: State_hash.t]
×
560
                                 (Transition_frontier.Breadcrumb.state_hash
×
561
                                    next_expected_breadcrumb )
562
                                 (Mina_block.Validated.state_hash
×
563
                                    newly_added_transition ) ;
564
                               [%log info]
×
565
                                 ~metadata:
566
                                   [ ( "height"
567
                                     , `Int
568
                                         ( newly_added_transition
569
                                         |> Mina_block.Validated.forget
×
570
                                         |> With_hash.data |> Mina_block.header
×
571
                                         |> Mina_block.Header.protocol_state
×
572
                                         |> Protocol_state.consensus_state
×
573
                                         |> Consensus.Data.Consensus_state
574
                                            .blockchain_length
×
575
                                         |> Mina_numbers.Length.to_uint32
×
576
                                         |> Unsigned.UInt32.to_int ) )
×
577
                                   ]
578
                                 "transition of $height passed processor" ;
579
                               if List.is_empty tail then `Stop true
×
580
                               else `Continue tail
×
581
                           | [] ->
×
582
                               `Stop false ) ) )
583
                with
584
                | `Timeout ->
×
585
                    failwith "test timed out"
586
                | `Ok (`Eof _) ->
×
587
                    failwith "pipe closed unexpectedly"
588
                | `Ok (`Terminated x) ->
×
589
                    x ) ) )
590
  end )
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc