• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2890

12 Nov 2024 12:30PM UTC coverage: 37.632% (-23.2%) from 60.813%
2890

push

buildkite

web-flow
Merge pull request #16333 from MinaProtocol/dkijania/port_new_deb_s3_dev

[DEV] Use new version of deb-s3 for validating job publishing

14488 of 38499 relevant lines covered (37.63%)

35927.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.76
/src/lib/network_pool/network_pool_base.ml
1
open Async_kernel
4✔
2
open Core_kernel
3
open Pipe_lib
4
open Network_peer
5

6
module Make (Transition_frontier : sig
7
  type t
8
end)
9
(Resource_pool : Intf.Resource_pool_intf
10
                   with type transition_frontier := Transition_frontier.t) :
11
  Intf.Network_pool_base_intf
12
    with type resource_pool := Resource_pool.t
13
     and type resource_pool_diff := Resource_pool.Diff.t
14
     and type resource_pool_diff_verified := Resource_pool.Diff.verified
15
     and type transition_frontier := Transition_frontier.t
16
     and type transition_frontier_diff := Resource_pool.transition_frontier_diff
17
     and type config := Resource_pool.Config.t
18
     and type rejected_diff := Resource_pool.Diff.rejected = struct
19
  let apply_and_broadcast_thread_label =
20
    "apply_and_broadcast_" ^ Resource_pool.label ^ "_diffs"
21

22
  let processing_diffs_thread_label =
23
    "processing_" ^ Resource_pool.label ^ "_diffs"
24

25
  let processing_transition_frontier_diffs_thread_label =
26
    "processing_" ^ Resource_pool.label ^ "_transition_frontier_diffs"
27

28
  let rebroadcast_loop_thread_label = Resource_pool.label ^ "_rebroadcast_loop"
29

30
  module Broadcast_callback = struct
31
    type resource_pool_diff = Resource_pool.Diff.t
32

33
    type rejected_diff = Resource_pool.Diff.rejected
34

35
    type t =
36
      | Local of
37
          (   ( [ `Broadcasted | `Not_broadcasted ]
38
              * Resource_pool.Diff.t
39
              * Resource_pool.Diff.rejected )
40
              Or_error.t
41
           -> unit )
42
      | External of Mina_net2.Validation_callback.t
43

44
    let is_expired = function
45
      | Local _ ->
×
46
          false
47
      | External cb ->
×
48
          Mina_net2.Validation_callback.is_expired cb
49

50
    open Mina_net2.Validation_callback
51

52
    let error err = function
53
      | Local f ->
×
54
          f (Error err)
55
      | External cb ->
×
56
          fire_if_not_already_fired cb `Reject
57

58
    let reject accepted rejected = function
59
      | Local f ->
×
60
          f (Ok (`Not_broadcasted, accepted, rejected))
61
      | External cb ->
×
62
          fire_if_not_already_fired cb `Reject
63

64
    let drop accepted rejected = function
65
      | Local f ->
×
66
          f (Ok (`Not_broadcasted, accepted, rejected))
67
      | External cb ->
×
68
          fire_if_not_already_fired cb `Ignore
69

70
    let forward broadcast_pipe accepted rejected = function
71
      | Local f ->
×
72
          f (Ok (`Broadcasted, accepted, rejected)) ;
73
          Linear_pipe.write broadcast_pipe
×
74
            { With_nonce.message = accepted; nonce = 0 }
75
          |> don't_wait_for
76
      | External cb ->
×
77
          fire_if_not_already_fired cb `Accept
78
  end
79

80
  module Remote_sink =
81
    Pool_sink.Remote_sink
82
      (struct
83
        include Resource_pool.Diff
84

85
        let label = Resource_pool.label
86

87
        type pool = Resource_pool.t
88
      end)
89
      (Broadcast_callback)
90

91
  module Local_sink =
92
    Pool_sink.Local_sink
93
      (struct
94
        include Resource_pool.Diff
95

96
        let label = Resource_pool.label
97

98
        type pool = Resource_pool.t
99
      end)
100
      (Broadcast_callback)
101

102
  type t =
103
    { resource_pool : Resource_pool.t
104
    ; logger : Logger.t
105
    ; write_broadcasts : Resource_pool.Diff.t With_nonce.t Linear_pipe.Writer.t
106
    ; read_broadcasts : Resource_pool.Diff.t With_nonce.t Linear_pipe.Reader.t
107
    ; constraint_constants : Genesis_constants.Constraint_constants.t
108
    ; block_window_duration : Time.Span.t
109
    }
110

111
  let resource_pool { resource_pool; _ } = resource_pool
×
112

113
  let broadcasts { read_broadcasts; _ } = read_broadcasts
×
114

115
  let create_rate_limiter () =
116
    Rate_limiter.create
×
117
      ~capacity:
118
        (Resource_pool.Diff.max_per_15_seconds, `Per (Time.Span.of_sec 15.0))
×
119

120
  let apply_and_broadcast ({ logger; _ } as t)
121
      (diff : Resource_pool.Diff.verified Envelope.Incoming.t) cb =
122
    let env = Envelope.Incoming.map ~f:Resource_pool.Diff.t_of_verified diff in
×
123
    let rebroadcast (diff', rejected) =
×
124
      let open Broadcast_callback in
×
125
      if Resource_pool.Diff.is_empty diff' then (
×
126
        [%log trace]
×
127
          "Refusing to rebroadcast $diff. Pool diff apply feedback: empty diff"
128
          ~metadata:
129
            [ ( "diff"
130
              , `String Resource_pool.Diff.(summary @@ t_of_verified diff.data)
×
131
              )
132
            ] ;
133
        drop diff' rejected cb )
×
134
      else (
×
135
        [%log debug] "Rebroadcasting diff %s" (Resource_pool.Diff.summary diff') ;
×
136
        forward t.write_broadcasts diff' rejected cb )
×
137
    in
138
    O1trace.sync_thread apply_and_broadcast_thread_label (fun () ->
139
        match Resource_pool.Diff.unsafe_apply t.resource_pool diff with
×
140
        | Ok (`Accept, accepted, rejected) ->
×
141
            Resource_pool.Diff.log_internal ~logger "accepted" env ;
142
            rebroadcast (accepted, rejected)
×
143
        | Ok (`Reject, accepted, rejected) ->
×
144
            Resource_pool.Diff.log_internal ~logger "rejected"
145
              ~reason:"not_applied" env ;
146
            Broadcast_callback.reject accepted rejected cb
×
147
        | Error (`Locally_generated res) ->
×
148
            Resource_pool.Diff.log_internal ~logger "rejected"
149
              ~reason:"locally_generated" env ;
150
            rebroadcast res
×
151
        | Error (`Other e) ->
×
152
            [%log' debug t.logger]
×
153
              "Refusing to rebroadcast. Pool diff apply feedback: $error"
154
              ~metadata:[ ("error", Error_json.error_to_yojson e) ] ;
×
155
            Resource_pool.Diff.log_internal ~logger "rejected" env ;
×
156
            Broadcast_callback.error e cb )
×
157

158
  let log_rate_limiter_occasionally t rl =
159
    let time = Time_ns.Span.of_min 1. in
12✔
160
    every time (fun () ->
12✔
161
        [%log' debug t.logger]
36✔
162
          ~metadata:[ ("rate_limiter", Rate_limiter.summary rl) ]
36✔
163
          !"%s $rate_limiter" Resource_pool.label )
164

165
  type wrapped_t =
166
    | Diff of
167
        (Resource_pool.Diff.verified Envelope.Incoming.t * Broadcast_callback.t)
168
    | Transition_frontier_extension of Resource_pool.transition_frontier_diff
169

170
  let of_resource_pool_and_diffs resource_pool ~logger ~constraint_constants
171
      ~tf_diffs ~log_gossip_heard ~on_remote_push ~block_window_duration =
172
    let read_broadcasts, write_broadcasts = Linear_pipe.create () in
12✔
173
    let network_pool =
12✔
174
      { resource_pool
175
      ; logger
176
      ; read_broadcasts
177
      ; write_broadcasts
178
      ; constraint_constants
179
      ; block_window_duration
180
      }
181
    in
182
    let remote_r, remote_w, remote_rl =
183
      Remote_sink.create ~log_gossip_heard ~on_push:on_remote_push
184
        ~wrap:(fun m -> Diff m)
×
185
        ~unwrap:(function
186
          | Diff m -> m | _ -> failwith "unexpected message type" )
×
187
        ~trace_label:Resource_pool.label ~logger resource_pool
188
    in
189
    let local_r, local_w, _ =
12✔
190
      Local_sink.create
191
        ~wrap:(fun m -> Diff m)
×
192
        ~unwrap:(function
193
          | Diff m -> m | _ -> failwith "unexpected message type" )
×
194
        ~trace_label:Resource_pool.label ~logger resource_pool
195
    in
196
    log_rate_limiter_occasionally network_pool remote_rl ;
12✔
197
    (*priority: Transition frontier diffs > local diffs > incoming diffs*)
198
    Deferred.don't_wait_for
12✔
199
      (O1trace.thread Resource_pool.label (fun () ->
12✔
200
           Strict_pipe.Reader.Merge.iter_sync
12✔
201
             [ Strict_pipe.Reader.map tf_diffs ~f:(fun diff ->
12✔
202
                   Transition_frontier_extension diff )
36✔
203
             ; remote_r
204
             ; local_r
205
             ]
206
             ~f:(fun diff_source ->
207
               match diff_source with
36✔
208
               | Diff ((verified_diff, cb) : Remote_sink.unwrapped_t) ->
×
209
                   O1trace.sync_thread processing_diffs_thread_label (fun () ->
210
                       apply_and_broadcast network_pool verified_diff cb )
×
211
               | Transition_frontier_extension diff ->
36✔
212
                   O1trace.sync_thread
213
                     processing_transition_frontier_diffs_thread_label
214
                     (fun () ->
215
                       Resource_pool.handle_transition_frontier_diff diff
36✔
216
                         resource_pool ) ) ) ) ;
217
    (network_pool, remote_w, local_w)
12✔
218

219
  (* Rebroadcast locally generated pool items every 10 minutes. Do so for 50
220
     minutes - at most 5 rebroadcasts - before giving up.
221

222
     The goal here is to be resilient to short term network failures and
223
     partitions. Note that with gossip we don't know anything about the state of
224
     other peers' pools (we know if something made it into a block, but that can
225
     take a long time and it's possible for things to be successfully received
226
     but never used in a block), so in a healthy network all repetition is spam.
227
     We need to balance reliability with efficiency. Exponential "backoff" would
228
     be better, but it'd complicate the interface between this module and the
229
     specific pool implementations.
230
  *)
231
  let rebroadcast_loop : t -> Logger.t -> unit Deferred.t =
232
   fun t logger ->
233
    let rebroadcast_interval = Time.Span.of_min 10. in
12✔
234
    let rebroadcast_window = Time.Span.scale rebroadcast_interval 5. in
12✔
235
    let has_timed_out time =
12✔
236
      if Time.(add time rebroadcast_window < now ()) then `Timed_out else `Ok
×
237
    in
238
    let rec go () =
239
      let rebroadcastable =
12✔
240
        Resource_pool.get_rebroadcastable t.resource_pool ~has_timed_out
241
      in
242
      if List.is_empty rebroadcastable then
12✔
243
        [%log trace] "Nothing to rebroadcast"
12✔
244
      else
245
        [%log debug]
×
246
          "Preparing to rebroadcast locally generated resource pool diffs \
247
           $diffs"
248
          ~metadata:
249
            [ ("count", `Int (List.length rebroadcastable))
×
250
            ; ( "diffs"
251
              , `List
252
                  (List.map
×
253
                     ~f:(fun d -> `String (Resource_pool.Diff.summary d))
×
254
                     rebroadcastable ) )
255
            ] ;
256
      let nonce = Time_ns.to_int_ns_since_epoch (Time_ns.now ()) in
12✔
257
      let%bind () =
258
        Deferred.List.iter rebroadcastable ~f:(fun message ->
12✔
259
            Linear_pipe.write t.write_broadcasts With_nonce.{ message; nonce } )
×
260
      in
261
      let%bind () = Async.after rebroadcast_interval in
12✔
262
      go ()
×
263
    in
264
    go ()
265

266
  let create ~config ~constraint_constants ~consensus_constants ~time_controller
267
      ~frontier_broadcast_pipe ~logger ~log_gossip_heard ~on_remote_push
268
      ~block_window_duration =
269
    (* Diffs from transition frontier extensions *)
270
    let tf_diff_reader, tf_diff_writer =
12✔
271
      Strict_pipe.(
272
        create ~name:"Network pool transition frontier diffs" Synchronous)
12✔
273
    in
274
    let t, locals, remotes =
275
      of_resource_pool_and_diffs
276
        (Resource_pool.create ~constraint_constants ~consensus_constants
277
           ~time_controller ~config ~logger ~frontier_broadcast_pipe
278
           ~tf_diff_writer )
279
        ~constraint_constants ~logger ~tf_diffs:tf_diff_reader ~log_gossip_heard
280
        ~on_remote_push ~block_window_duration
281
    in
282
    O1trace.background_thread rebroadcast_loop_thread_label (fun () ->
12✔
283
        rebroadcast_loop t logger ) ;
12✔
284
    (t, locals, remotes)
12✔
285
end
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc