• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2787

21 Oct 2024 06:35PM UTC coverage: 61.095% (-0.02%) from 61.111%
2787

push

buildkite

web-flow
Merge pull request #16168 from MinaProtocol/dkijania/dhall_checks

Introduce additional checks for dhall (deps, dirtyWhen)

47168 of 77204 relevant lines covered (61.1%)

569070.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.88
/src/lib/network_pool/network_pool_base.ml
1
open Async_kernel
23✔
2
open Core_kernel
3
open Pipe_lib
4
open Network_peer
5

6
module Make (Transition_frontier : sig
7
  type t
8
end)
9
(Resource_pool : Intf.Resource_pool_intf
10
                   with type transition_frontier := Transition_frontier.t) :
11
  Intf.Network_pool_base_intf
12
    with type resource_pool := Resource_pool.t
13
     and type resource_pool_diff := Resource_pool.Diff.t
14
     and type resource_pool_diff_verified := Resource_pool.Diff.verified
15
     and type transition_frontier := Transition_frontier.t
16
     and type transition_frontier_diff := Resource_pool.transition_frontier_diff
17
     and type config := Resource_pool.Config.t
18
     and type rejected_diff := Resource_pool.Diff.rejected = struct
19
  let apply_and_broadcast_thread_label =
20
    "apply_and_broadcast_" ^ Resource_pool.label ^ "_diffs"
21

22
  let processing_diffs_thread_label =
23
    "processing_" ^ Resource_pool.label ^ "_diffs"
24

25
  let processing_transition_frontier_diffs_thread_label =
26
    "processing_" ^ Resource_pool.label ^ "_transition_frontier_diffs"
27

28
  let rebroadcast_loop_thread_label = Resource_pool.label ^ "_rebroadcast_loop"
29

30
  module Broadcast_callback = struct
31
    type resource_pool_diff = Resource_pool.Diff.t
32

33
    type rejected_diff = Resource_pool.Diff.rejected
34

35
    type t =
36
      | Local of
37
          (   ( [ `Broadcasted | `Not_broadcasted ]
38
              * Resource_pool.Diff.t
39
              * Resource_pool.Diff.rejected )
40
              Or_error.t
41
           -> unit )
42
      | External of Mina_net2.Validation_callback.t
43

44
    let is_expired = function
45
      | Local _ ->
13✔
46
          false
47
      | External cb ->
10✔
48
          Mina_net2.Validation_callback.is_expired cb
49

50
    open Mina_net2.Validation_callback
51

52
    let error err = function
53
      | Local f ->
12✔
54
          f (Error err)
55
      | External cb ->
10✔
56
          fire_if_not_already_fired cb `Reject
57

58
    let reject accepted rejected = function
59
      | Local f ->
×
60
          f (Ok (`Not_broadcasted, accepted, rejected))
61
      | External cb ->
×
62
          fire_if_not_already_fired cb `Reject
63

64
    let drop accepted rejected = function
65
      | Local f ->
×
66
          f (Ok (`Not_broadcasted, accepted, rejected))
67
      | External cb ->
×
68
          fire_if_not_already_fired cb `Ignore
69

70
    let forward broadcast_pipe accepted rejected = function
71
      | Local f ->
3✔
72
          f (Ok (`Broadcasted, accepted, rejected)) ;
73
          Linear_pipe.write broadcast_pipe
3✔
74
            { With_nonce.message = accepted; nonce = 0 }
75
          |> don't_wait_for
76
      | External cb ->
×
77
          fire_if_not_already_fired cb `Accept
78
  end
79

80
  module Remote_sink =
81
    Pool_sink.Remote_sink
82
      (struct
83
        include Resource_pool.Diff
84

85
        let label = Resource_pool.label
86

87
        type pool = Resource_pool.t
88
      end)
89
      (Broadcast_callback)
90

91
  module Local_sink =
92
    Pool_sink.Local_sink
93
      (struct
94
        include Resource_pool.Diff
95

96
        let label = Resource_pool.label
97

98
        type pool = Resource_pool.t
99
      end)
100
      (Broadcast_callback)
101

102
  type t =
103
    { resource_pool : Resource_pool.t
104
    ; logger : Logger.t
105
    ; write_broadcasts : Resource_pool.Diff.t With_nonce.t Linear_pipe.Writer.t
106
    ; read_broadcasts : Resource_pool.Diff.t With_nonce.t Linear_pipe.Reader.t
107
    ; constraint_constants : Genesis_constants.Constraint_constants.t
108
    ; block_window_duration : Time.Span.t
109
    }
110

111
  let resource_pool { resource_pool; _ } = resource_pool
573✔
112

113
  let broadcasts { read_broadcasts; _ } = read_broadcasts
7✔
114

115
  let create_rate_limiter () =
116
    Rate_limiter.create
×
117
      ~capacity:
118
        (Resource_pool.Diff.max_per_15_seconds, `Per (Time.Span.of_sec 15.0))
×
119

120
  let apply_and_broadcast ({ logger; _ } as t)
121
      (diff : Resource_pool.Diff.verified Envelope.Incoming.t) cb =
122
    let env = Envelope.Incoming.map ~f:Resource_pool.Diff.t_of_verified diff in
4✔
123
    let rebroadcast (diff', rejected) =
4✔
124
      let open Broadcast_callback in
3✔
125
      if Resource_pool.Diff.is_empty diff' then (
×
126
        [%log trace]
×
127
          "Refusing to rebroadcast $diff. Pool diff apply feedback: empty diff"
128
          ~metadata:
129
            [ ( "diff"
130
              , `String Resource_pool.Diff.(summary @@ t_of_verified diff.data)
×
131
              )
132
            ] ;
133
        drop diff' rejected cb )
×
134
      else (
3✔
135
        [%log debug] "Rebroadcasting diff %s" (Resource_pool.Diff.summary diff') ;
3✔
136
        forward t.write_broadcasts diff' rejected cb )
3✔
137
    in
138
    O1trace.sync_thread apply_and_broadcast_thread_label (fun () ->
139
        match Resource_pool.Diff.unsafe_apply t.resource_pool diff with
4✔
140
        | Ok (`Accept, accepted, rejected) ->
3✔
141
            Resource_pool.Diff.log_internal ~logger "accepted" env ;
142
            rebroadcast (accepted, rejected)
3✔
143
        | Ok (`Reject, accepted, rejected) ->
×
144
            Resource_pool.Diff.log_internal ~logger "rejected"
145
              ~reason:"not_applied" env ;
146
            Broadcast_callback.reject accepted rejected cb
×
147
        | Error (`Locally_generated res) ->
×
148
            Resource_pool.Diff.log_internal ~logger "rejected"
149
              ~reason:"locally_generated" env ;
150
            rebroadcast res
×
151
        | Error (`Other e) ->
1✔
152
            [%log' debug t.logger]
1✔
153
              "Refusing to rebroadcast. Pool diff apply feedback: $error"
154
              ~metadata:[ ("error", Error_json.error_to_yojson e) ] ;
1✔
155
            Resource_pool.Diff.log_internal ~logger "rejected" env ;
1✔
156
            Broadcast_callback.error e cb )
1✔
157

158
  let log_rate_limiter_occasionally t rl =
159
    let time = Time_ns.Span.of_min 1. in
588✔
160
    every time (fun () ->
588✔
161
        [%log' debug t.logger]
1,890✔
162
          ~metadata:[ ("rate_limiter", Rate_limiter.summary rl) ]
1,890✔
163
          !"%s $rate_limiter" Resource_pool.label )
164

165
  type wrapped_t =
166
    | Diff of
167
        (Resource_pool.Diff.verified Envelope.Incoming.t * Broadcast_callback.t)
168
    | Transition_frontier_extension of Resource_pool.transition_frontier_diff
169

170
  let of_resource_pool_and_diffs resource_pool ~logger ~constraint_constants
171
      ~tf_diffs ~log_gossip_heard ~on_remote_push ~block_window_duration =
172
    let read_broadcasts, write_broadcasts = Linear_pipe.create () in
588✔
173
    let network_pool =
588✔
174
      { resource_pool
175
      ; logger
176
      ; read_broadcasts
177
      ; write_broadcasts
178
      ; constraint_constants
179
      ; block_window_duration
180
      }
181
    in
182
    let remote_r, remote_w, remote_rl =
183
      Remote_sink.create ~log_gossip_heard ~on_push:on_remote_push
184
        ~wrap:(fun m -> Diff m)
×
185
        ~unwrap:(function
186
          | Diff m -> m | _ -> failwith "unexpected message type" )
×
187
        ~trace_label:Resource_pool.label ~logger resource_pool
188
        ~block_window_duration
189
    in
190
    let local_r, local_w, _ =
588✔
191
      Local_sink.create
192
        ~wrap:(fun m -> Diff m)
2✔
193
        ~unwrap:(function
194
          | Diff m -> m | _ -> failwith "unexpected message type" )
×
195
        ~trace_label:Resource_pool.label ~logger resource_pool
196
        ~block_window_duration
197
    in
198
    log_rate_limiter_occasionally network_pool remote_rl ;
588✔
199
    (*priority: Transition frontier diffs > local diffs > incoming diffs*)
200
    Deferred.don't_wait_for
588✔
201
      (O1trace.thread Resource_pool.label (fun () ->
588✔
202
           Strict_pipe.Reader.Merge.iter_sync
588✔
203
             [ Strict_pipe.Reader.map tf_diffs ~f:(fun diff ->
588✔
204
                   Transition_frontier_extension diff )
1,208✔
205
             ; remote_r
206
             ; local_r
207
             ]
208
             ~f:(fun diff_source ->
209
               match diff_source with
1,210✔
210
               | Diff ((verified_diff, cb) : Remote_sink.unwrapped_t) ->
2✔
211
                   O1trace.sync_thread processing_diffs_thread_label (fun () ->
212
                       apply_and_broadcast network_pool verified_diff cb )
2✔
213
               | Transition_frontier_extension diff ->
1,208✔
214
                   O1trace.sync_thread
215
                     processing_transition_frontier_diffs_thread_label
216
                     (fun () ->
217
                       Resource_pool.handle_transition_frontier_diff diff
1,208✔
218
                         resource_pool ) ) ) ) ;
219
    (network_pool, remote_w, local_w)
588✔
220

221
  (* Rebroadcast locally generated pool items every 10 minutes. Do so for 50
222
     minutes - at most 5 rebroadcasts - before giving up.
223

224
     The goal here is to be resilient to short term network failures and
225
     partitions. Note that with gossip we don't know anything about the state of
226
     other peers' pools (we know if something made it into a block, but that can
227
     take a long time and it's possible for things to be successfully received
228
     but never used in a block), so in a healthy network all repetition is spam.
229
     We need to balance reliability with efficiency. Exponential "backoff" would
230
     be better, but it'd complicate the interface between this module and the
231
     specific pool implementations.
232
  *)
233
  let rebroadcast_loop : t -> Logger.t -> unit Deferred.t =
234
   fun t logger ->
235
    let rebroadcast_interval = Time.Span.of_min 10. in
588✔
236
    let rebroadcast_window = Time.Span.scale rebroadcast_interval 5. in
588✔
237
    let has_timed_out time =
588✔
238
      if Time.(add time rebroadcast_window < now ()) then `Timed_out else `Ok
×
239
    in
240
    let rec go () =
241
      let rebroadcastable =
588✔
242
        Resource_pool.get_rebroadcastable t.resource_pool ~has_timed_out
243
      in
244
      if List.is_empty rebroadcastable then
588✔
245
        [%log trace] "Nothing to rebroadcast"
588✔
246
      else
247
        [%log debug]
×
248
          "Preparing to rebroadcast locally generated resource pool diffs \
249
           $diffs"
250
          ~metadata:
251
            [ ("count", `Int (List.length rebroadcastable))
×
252
            ; ( "diffs"
253
              , `List
254
                  (List.map
×
255
                     ~f:(fun d -> `String (Resource_pool.Diff.summary d))
×
256
                     rebroadcastable ) )
257
            ] ;
258
      let nonce = Time_ns.to_int_ns_since_epoch (Time_ns.now ()) in
588✔
259
      let%bind () =
260
        Deferred.List.iter rebroadcastable ~f:(fun message ->
588✔
261
            Linear_pipe.write t.write_broadcasts With_nonce.{ message; nonce } )
×
262
      in
263
      let%bind () = Async.after rebroadcast_interval in
588✔
264
      go ()
×
265
    in
266
    go ()
267

268
  let create ~config ~constraint_constants ~consensus_constants ~time_controller
269
      ~frontier_broadcast_pipe ~logger ~log_gossip_heard ~on_remote_push
270
      ~block_window_duration =
271
    (* Diffs from transition frontier extensions *)
272
    let tf_diff_reader, tf_diff_writer =
588✔
273
      Strict_pipe.(
274
        create ~name:"Network pool transition frontier diffs" Synchronous)
588✔
275
    in
276
    let t, locals, remotes =
277
      of_resource_pool_and_diffs
278
        (Resource_pool.create ~constraint_constants ~consensus_constants
279
           ~time_controller ~config ~logger ~frontier_broadcast_pipe
280
           ~tf_diff_writer )
281
        ~constraint_constants ~logger ~tf_diffs:tf_diff_reader ~log_gossip_heard
282
        ~on_remote_push ~block_window_duration
283
    in
284
    O1trace.background_thread rebroadcast_loop_thread_label (fun () ->
588✔
285
        rebroadcast_loop t logger ) ;
588✔
286
    (t, locals, remotes)
588✔
287
end
23✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc