• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 767

04 Nov 2025 01:59PM UTC coverage: 32.374% (-4.5%) from 36.902%
767

push

buildkite

web-flow
Merge pull request #18063 from MinaProtocol/lyh/compat-into-dev-nov4-2025

Merge compatible into develop Nov. 4th 2025

87 of 228 new or added lines in 10 files covered. (38.16%)

3416 existing lines in 136 files now uncovered.

23591 of 72871 relevant lines covered (32.37%)

26590.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.15
/src/lib/network_pool/network_pool_base.ml
1
open Async_kernel
122✔
2
open Core_kernel
3
open Pipe_lib
4
open Network_peer
5

6
module Make (Transition_frontier : sig
7
  type t
8
end)
9
(Resource_pool : Intf.Resource_pool_intf
10
                   with type transition_frontier := Transition_frontier.t) :
11
  Intf.Network_pool_base_intf
12
    with type resource_pool := Resource_pool.t
13
     and type resource_pool_diff := Resource_pool.Diff.t
14
     and type resource_pool_diff_verified := Resource_pool.Diff.verified
15
     and type transition_frontier := Transition_frontier.t
16
     and type transition_frontier_diff := Resource_pool.transition_frontier_diff
17
     and type config := Resource_pool.Config.t
18
     and type rejected_diff := Resource_pool.Diff.rejected = struct
19
  let apply_and_broadcast_thread_label =
20
    "apply_and_broadcast_" ^ Resource_pool.label ^ "_diffs"
21

22
  let apply_no_broadcast_thread_label =
23
    "apply_no_broadcast_" ^ Resource_pool.label ^ "_diffs"
24

25
  let processing_diffs_thread_label =
26
    "processing_" ^ Resource_pool.label ^ "_diffs"
27

28
  let processing_transition_frontier_diffs_thread_label =
29
    "processing_" ^ Resource_pool.label ^ "_transition_frontier_diffs"
30

31
  let rebroadcast_loop_thread_label = Resource_pool.label ^ "_rebroadcast_loop"
32

33
  module Broadcast_callback = struct
34
    type resource_pool_diff = Resource_pool.Diff.t
35

36
    type rejected_diff = Resource_pool.Diff.rejected
37

38
    type t =
39
      | Local of
40
          (   ( [ `Broadcasted | `Not_broadcasted ]
41
              * Resource_pool.Diff.t
42
              * Resource_pool.Diff.rejected )
43
              Or_error.t
44
           -> unit )
45
      | External of Mina_net2.Validation_callback.t
46

47
    let is_expired = function
48
      | Local _ ->
×
49
          false
50
      | External cb ->
×
51
          Mina_net2.Validation_callback.is_expired cb
52

53
    open Mina_net2.Validation_callback
54

55
    let error err = function
56
      | Local f ->
×
57
          f (Error err)
58
      | External cb ->
×
59
          fire_if_not_already_fired cb `Reject
60

61
    let reject accepted rejected = function
62
      | Local f ->
×
63
          f (Ok (`Not_broadcasted, accepted, rejected))
64
      | External cb ->
×
65
          fire_if_not_already_fired cb `Reject
66

67
    let drop accepted rejected = function
68
      | Local f ->
×
69
          f (Ok (`Not_broadcasted, accepted, rejected))
70
      | External cb ->
×
71
          fire_if_not_already_fired cb `Ignore
72

73
    let forward broadcast_pipe accepted rejected = function
74
      | Local f ->
×
75
          f (Ok (`Broadcasted, accepted, rejected)) ;
76
          Linear_pipe.write broadcast_pipe
×
77
            { With_nonce.message = accepted; nonce = 0 }
78
          |> don't_wait_for
79
      | External cb ->
×
80
          fire_if_not_already_fired cb `Accept
81
  end
82

83
  module Remote_sink =
84
    Pool_sink.Remote_sink
85
      (struct
86
        include Resource_pool.Diff
87

88
        let label = Resource_pool.label
89

90
        type pool = Resource_pool.t
91
      end)
92
      (Broadcast_callback)
93

94
  module Local_sink =
95
    Pool_sink.Local_sink
96
      (struct
97
        include Resource_pool.Diff
98

99
        let label = Resource_pool.label
100

101
        type pool = Resource_pool.t
102
      end)
103
      (Broadcast_callback)
104

105
  type t =
106
    { resource_pool : Resource_pool.t
107
    ; logger : Logger.t
108
    ; write_broadcasts : Resource_pool.Diff.t With_nonce.t Linear_pipe.Writer.t
109
    ; read_broadcasts : Resource_pool.Diff.t With_nonce.t Linear_pipe.Reader.t
110
    ; constraint_constants : Genesis_constants.Constraint_constants.t
111
    ; block_window_duration : Time.Span.t
112
    }
113

114
  let resource_pool { resource_pool; _ } = resource_pool
×
115

116
  let broadcasts { read_broadcasts; _ } = read_broadcasts
×
117

118
  let create_rate_limiter () =
119
    Rate_limiter.create
×
120
      ~capacity:
121
        (Resource_pool.Diff.max_per_15_seconds, `Per (Time.Span.of_sec 15.0))
×
122

123
  let apply_and_broadcast ({ logger; _ } as t)
124
      (diff : Resource_pool.Diff.verified Envelope.Incoming.t) cb =
125
    let env = Envelope.Incoming.map ~f:Resource_pool.Diff.t_of_verified diff in
×
126
    let rebroadcast (diff', rejected) =
×
127
      let open Broadcast_callback in
×
128
      if Resource_pool.Diff.is_empty diff' then (
×
129
        [%log trace]
×
130
          "Refusing to rebroadcast $diff. Pool diff apply feedback: empty diff"
131
          ~metadata:
132
            [ ( "diff"
133
              , `String Resource_pool.Diff.(summary @@ t_of_verified diff.data)
×
134
              )
135
            ] ;
136
        drop diff' rejected cb )
×
137
      else (
×
138
        [%log debug] "Rebroadcasting diff %s" (Resource_pool.Diff.summary diff') ;
×
139
        forward t.write_broadcasts diff' rejected cb )
×
140
    in
141
    O1trace.sync_thread apply_and_broadcast_thread_label (fun () ->
142
        match Resource_pool.Diff.unsafe_apply t.resource_pool diff with
×
143
        | Ok (`Accept, accepted, rejected) ->
×
144
            Resource_pool.Diff.log_internal ~logger "accepted" env ;
145
            rebroadcast (accepted, rejected)
×
146
        | Ok (`Reject, accepted, rejected) ->
×
147
            Resource_pool.Diff.log_internal ~logger "rejected"
148
              ~reason:"not_applied" env ;
149
            Broadcast_callback.reject accepted rejected cb
×
150
        | Error (`Locally_generated res) ->
×
151
            Resource_pool.Diff.log_internal ~logger "rejected"
152
              ~reason:"locally_generated" env ;
153
            rebroadcast res
×
154
        | Error (`Other e) ->
×
155
            [%log' debug t.logger]
×
156
              "Refusing to rebroadcast. Pool diff apply feedback: $error"
157
              ~metadata:[ ("error", Error_json.error_to_yojson e) ] ;
×
158
            Resource_pool.Diff.log_internal ~logger "rejected" env ;
×
159
            Broadcast_callback.error e cb )
×
160

161
  let apply_no_broadcast ({ logger; _ } as t)
162
      (diff : Resource_pool.Diff.verified Envelope.Incoming.t) =
163
    let env = Envelope.Incoming.map ~f:Resource_pool.Diff.t_of_verified diff in
×
164
    O1trace.sync_thread apply_no_broadcast_thread_label (fun () ->
×
165
        match Resource_pool.Diff.unsafe_apply t.resource_pool diff with
×
166
        | Ok (`Accept, _accepted, _rejected) ->
×
167
            Resource_pool.Diff.log_internal ~logger "accepted" env
168
        | Ok (`Reject, _accepted, _rejected) ->
×
169
            Resource_pool.Diff.log_internal ~logger "rejected"
170
              ~reason:"not_applied" env
171
        | Error (`Locally_generated _res) ->
×
172
            Resource_pool.Diff.log_internal ~logger "rejected"
173
              ~reason:"locally_generated" env
174
        | Error (`Other e) ->
×
175
            [%log' debug t.logger] "Pool diff apply feedback: $error"
×
176
              ~metadata:[ ("error", Error_json.error_to_yojson e) ] ;
×
177
            Resource_pool.Diff.log_internal ~logger "rejected" env )
×
178

179
  let log_rate_limiter_occasionally t rl =
UNCOV
180
    let time = Time_ns.Span.of_min 1. in
×
UNCOV
181
    every time (fun () ->
×
UNCOV
182
        [%log' debug t.logger]
×
UNCOV
183
          ~metadata:[ ("rate_limiter", Rate_limiter.summary rl) ]
×
184
          !"%s $rate_limiter" Resource_pool.label )
185

186
  type wrapped_t =
187
    | Diff of
188
        (Resource_pool.Diff.verified Envelope.Incoming.t * Broadcast_callback.t)
189
    | Transition_frontier_extension of Resource_pool.transition_frontier_diff
190

191
  let of_resource_pool_and_diffs resource_pool ~logger ~constraint_constants
192
      ~tf_diffs ~log_gossip_heard ~on_remote_push ~block_window_duration =
UNCOV
193
    let read_broadcasts, write_broadcasts = Linear_pipe.create () in
×
UNCOV
194
    let network_pool =
×
195
      { resource_pool
196
      ; logger
197
      ; read_broadcasts
198
      ; write_broadcasts
199
      ; constraint_constants
200
      ; block_window_duration
201
      }
202
    in
203
    let remote_r, remote_w, remote_rl =
204
      Remote_sink.create ~log_gossip_heard ~on_push:on_remote_push
205
        ~wrap:(fun m -> Diff m)
×
206
        ~unwrap:(function
207
          | Diff m -> m | _ -> failwith "unexpected message type" )
×
208
        ~trace_label:Resource_pool.label ~logger resource_pool
209
    in
UNCOV
210
    let local_r, local_w, _ =
×
211
      Local_sink.create
212
        ~wrap:(fun m -> Diff m)
×
213
        ~unwrap:(function
214
          | Diff m -> m | _ -> failwith "unexpected message type" )
×
215
        ~trace_label:Resource_pool.label ~logger resource_pool
216
    in
UNCOV
217
    log_rate_limiter_occasionally network_pool remote_rl ;
×
218
    (*priority: Transition frontier diffs > local diffs > incoming diffs*)
UNCOV
219
    Deferred.don't_wait_for
×
UNCOV
220
      (O1trace.thread Resource_pool.label (fun () ->
×
UNCOV
221
           Strict_pipe.Reader.Merge.iter_sync
×
UNCOV
222
             [ Strict_pipe.Reader.map tf_diffs ~f:(fun diff ->
×
UNCOV
223
                   Transition_frontier_extension diff )
×
224
             ; remote_r
225
             ; local_r
226
             ]
227
             ~f:(fun diff_source ->
UNCOV
228
               match diff_source with
×
229
               | Diff ((verified_diff, cb) : Remote_sink.unwrapped_t) ->
×
230
                   O1trace.sync_thread processing_diffs_thread_label (fun () ->
231
                       apply_and_broadcast network_pool verified_diff cb )
×
UNCOV
232
               | Transition_frontier_extension diff ->
×
233
                   O1trace.sync_thread
234
                     processing_transition_frontier_diffs_thread_label
235
                     (fun () ->
UNCOV
236
                       Resource_pool.handle_transition_frontier_diff diff
×
237
                         resource_pool ) ) ) ) ;
UNCOV
238
    (network_pool, remote_w, local_w)
×
239

240
  (* Rebroadcast locally generated pool items every 10 minutes. Do so for 50
241
     minutes - at most 5 rebroadcasts - before giving up.
242

243
     The goal here is to be resilient to short term network failures and
244
     partitions. Note that with gossip we don't know anything about the state of
245
     other peers' pools (we know if something made it into a block, but that can
246
     take a long time and it's possible for things to be successfully received
247
     but never used in a block), so in a healthy network all repetition is spam.
248
     We need to balance reliability with efficiency. Exponential "backoff" would
249
     be better, but it'd complicate the interface between this module and the
250
     specific pool implementations.
251
  *)
252
  let rebroadcast_loop : t -> Logger.t -> unit Deferred.t =
253
   fun t logger ->
UNCOV
254
    let rebroadcast_interval = Time.Span.of_min 10. in
×
UNCOV
255
    let rebroadcast_window = Time.Span.scale rebroadcast_interval 5. in
×
UNCOV
256
    let has_timed_out time =
×
257
      if Time.(add time rebroadcast_window < now ()) then `Timed_out else `Ok
×
258
    in
259
    let rec go () =
UNCOV
260
      let rebroadcastable =
×
261
        Resource_pool.get_rebroadcastable t.resource_pool ~has_timed_out
262
      in
UNCOV
263
      if List.is_empty rebroadcastable then
×
UNCOV
264
        [%log trace] "Nothing to rebroadcast"
×
265
      else
266
        [%log debug]
×
267
          "Preparing to rebroadcast locally generated resource pool diffs \
268
           $diffs"
269
          ~metadata:
270
            [ ("count", `Int (List.length rebroadcastable))
×
271
            ; ( "diffs"
272
              , `List
273
                  (List.map
×
274
                     ~f:(fun d -> `String (Resource_pool.Diff.summary d))
×
275
                     rebroadcastable ) )
276
            ] ;
UNCOV
277
      let nonce = Time_ns.to_int_ns_since_epoch (Time_ns.now ()) in
×
278
      let%bind () =
UNCOV
279
        Deferred.List.iter rebroadcastable ~f:(fun message ->
×
280
            Linear_pipe.write t.write_broadcasts With_nonce.{ message; nonce } )
×
281
      in
UNCOV
282
      let%bind () = Async.after rebroadcast_interval in
×
283
      go ()
×
284
    in
285
    go ()
286

287
  let create ~config ~constraint_constants ~consensus_constants ~time_controller
288
      ~frontier_broadcast_pipe ~logger ~log_gossip_heard ~on_remote_push
289
      ~block_window_duration =
290
    (* Diffs from transition frontier extensions *)
UNCOV
291
    let tf_diff_reader, tf_diff_writer =
×
292
      Strict_pipe.(
UNCOV
293
        create ~name:"Network pool transition frontier diffs" Synchronous)
×
294
    in
295
    let t, remotes, locals =
296
      of_resource_pool_and_diffs
297
        (Resource_pool.create ~constraint_constants ~consensus_constants
298
           ~time_controller ~config ~logger ~frontier_broadcast_pipe
299
           ~tf_diff_writer )
300
        ~constraint_constants ~logger ~tf_diffs:tf_diff_reader ~log_gossip_heard
301
        ~on_remote_push ~block_window_duration
302
    in
UNCOV
303
    O1trace.background_thread rebroadcast_loop_thread_label (fun () ->
×
UNCOV
304
        rebroadcast_loop t logger ) ;
×
UNCOV
305
    (t, remotes, locals)
×
306
end
122✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc