• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2863

05 Nov 2024 06:20PM UTC coverage: 30.754% (-16.6%) from 47.311%
2863

push

buildkite

web-flow
Merge pull request #16296 from MinaProtocol/dkijania/more_multi_jobs

more multi jobs in CI

20276 of 65930 relevant lines covered (30.75%)

8631.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.74
/src/lib/pipe_lib/strict_pipe.ml
1
open Async_kernel
7✔
2
open Core_kernel
3

4
exception Overflow of string
5

6
exception Multiple_reads_attempted of string
7

8
type crash = Overflow_behavior_crash
9

10
type drop_head = Overflow_behavior_drop_head
11

12
type call = Overflow_behavior_call
13

14
type (_, _, _) overflow_behavior =
15
  | Crash : ('a, crash, unit) overflow_behavior
16
  | Drop_head : ('a -> unit) -> ('a, drop_head, unit) overflow_behavior
17
  | Call : ('a -> 'r) -> ('a, call, 'r option) overflow_behavior
18

19
type synchronous = Type_synchronous
20

21
type _ buffered = Type_buffered
22

23
type (_, _, _) type_ =
24
  | Synchronous : ('a, synchronous, unit Deferred.t) type_
25
  | Buffered :
26
      [ `Capacity of int ] * [ `Overflow of ('a, 'b, 'r) overflow_behavior ]
27
      -> ('a, 'b buffered, 'r) type_
28

29
let value_or_empty = Option.value ~default:"<unnamed>"
30

31
module Reader0 = struct
32
  type 't t =
33
    { reader : 't Pipe.Reader.t
34
    ; mutable has_reader : bool
35
    ; mutable downstreams : downstreams
36
    ; name : string option
37
    }
38

39
  and downstreams =
40
    | [] : downstreams
41
    | ( :: ) : 'a t * downstreams -> downstreams
42

43
  let rec downstreams_from_list : 'a t list -> downstreams = function
44
    | [] ->
×
45
        []
46
    | r :: rs ->
×
47
        r :: downstreams_from_list rs
×
48

49
  (* TODO: See #1281 *)
50
  let to_linear_pipe { reader = pipe; has_reader; _ } =
51
    { Linear_pipe.Reader.pipe; has_reader }
×
52

53
  let of_linear_pipe ?name { Linear_pipe.Reader.pipe = reader; has_reader } =
54
    { reader; has_reader; downstreams = []; name }
20✔
55

56
  let pipe_name t = t.name
×
57

58
  let assert_not_read reader =
59
    if reader.has_reader then
40✔
60
      raise (Multiple_reads_attempted (value_or_empty reader.name))
×
61

62
  let wrap_reader ?name reader =
63
    { reader; has_reader = false; downstreams = []; name }
×
64

65
  let enforce_single_reader reader deferred =
66
    assert_not_read reader ;
40✔
67
    reader.has_reader <- true ;
40✔
68
    let%map result = deferred in
69
    reader.has_reader <- false ;
30✔
70
    result
71

72
  let read t = enforce_single_reader t (Pipe.read t.reader)
×
73

74
  let read' t = enforce_single_reader t (Pipe.read' t.reader)
×
75

76
  let fold reader ~init ~f =
77
    enforce_single_reader reader
40✔
78
      (let rec go b =
79
         match%bind Pipe.read reader.reader with
4,676✔
80
         | `Eof ->
30✔
81
             return b
82
         | `Ok a ->
4,636✔
83
             (* The async scheduler could yield here *)
84
             let%bind b' = f b a in
4,636✔
85
             go b'
4,636✔
86
       in
87
       go init )
40✔
88

89
  let fold_until reader ~init ~f =
90
    enforce_single_reader reader
×
91
      (let rec go b =
92
         match%bind Pipe.read reader.reader with
×
93
         | `Eof ->
×
94
             return (`Eof b)
95
         | `Ok a -> (
×
96
             (* The async scheduler could yield here *)
97
             match%bind f b a with
×
98
             | `Stop x ->
×
99
                 return (`Terminated x)
100
             | `Continue b' ->
×
101
                 go b' )
102
       in
103
       go init )
×
104

105
  let fold_without_pushback ?consumer reader ~init ~f =
106
    Pipe.fold_without_pushback ?consumer reader.reader ~init ~f
×
107

108
  let iter reader ~f = fold reader ~init:() ~f:(fun () -> f)
40✔
109

110
  let iter_without_pushback ?consumer ?continue_on_error reader ~f =
111
    Pipe.iter_without_pushback reader.reader ?consumer ?continue_on_error ~f
10✔
112

113
  let iter' reader ~f = Pipe.iter' reader.reader ~f
×
114

115
  let map reader ~f =
116
    assert_not_read reader ;
×
117
    reader.has_reader <- true ;
×
118
    let strict_reader =
119
      wrap_reader ?name:reader.name (Pipe.map reader.reader ~f)
×
120
    in
121
    reader.downstreams <- [ strict_reader ] ;
×
122
    strict_reader
123

124
  let filter_map reader ~f =
125
    assert_not_read reader ;
×
126
    reader.has_reader <- true ;
×
127
    let strict_reader =
128
      wrap_reader ?name:reader.name (Pipe.filter_map reader.reader ~f)
×
129
    in
130
    reader.downstreams <- [ strict_reader ] ;
×
131
    strict_reader
132

133
  let clear t = Pipe.clear t.reader
×
134

135
  let is_closed reader = Pipe.is_closed reader.reader
×
136

137
  module Merge = struct
138
    let iter readers ~f =
139
      let not_empty r = not @@ Pipe.is_empty r.reader in
×
140
      let rec read_deferred readers =
141
        let%bind ready_reader =
142
          match List.find readers ~f:not_empty with
143
          | Some reader ->
×
144
              Deferred.return (Some reader)
×
145
          | None ->
×
146
              let%map () =
147
                Deferred.choose
×
148
                  (List.map readers ~f:(fun r ->
×
149
                       Deferred.choice (Pipe.values_available r.reader)
×
150
                         (fun _ -> ()) ) )
×
151
              in
152
              List.find readers ~f:not_empty
×
153
        in
154
        match ready_reader with
×
155
        | Some reader -> (
×
156
            match Pipe.read_now reader.reader with
157
            | `Nothing_available ->
×
158
                failwith "impossible"
159
            | `Eof ->
×
160
                Deferred.unit
161
            | `Ok value ->
×
162
                Deferred.bind (f value) ~f:(fun () -> read_deferred readers) )
×
163
        | None -> (
×
164
            match List.filter readers ~f:(fun r -> not @@ is_closed r) with
×
165
            | [] ->
×
166
                Deferred.unit
167
            | open_readers ->
×
168
                read_deferred open_readers )
169
      in
170
      List.iter readers ~f:assert_not_read ;
171
      read_deferred readers
×
172

173
    let iter_sync readers ~f = iter readers ~f:(fun x -> f x ; Deferred.unit)
×
174
  end
175

176
  module Fork = struct
177
    let n reader count =
178
      let pipes = List.init count ~f:(fun _ -> Pipe.create ()) in
×
179
      let readers, writers = List.unzip pipes in
×
180
      (* This one place we _do_ want iter with pushback which we want to trigger
181
       * when all reads have pushed back downstream
182
       *
183
       * Since future reads will resolve via the iter_without_pushback, we
184
       * should still get the behavior we want. *)
185
      don't_wait_for
×
186
        (Pipe.iter reader.reader ~f:(fun x ->
×
187
             Deferred.List.iter writers ~f:(fun writer ->
×
188
                 if not (Pipe.is_closed writer) then Pipe.write writer x
×
189
                 else return () ) ) ) ;
×
190
      don't_wait_for
×
191
        (let%map () = Deferred.List.iter readers ~f:Pipe.closed in
×
192
         Pipe.close_read reader.reader ) ;
×
193
      let strict_readers =
×
194
        List.map readers ~f:(wrap_reader ?name:reader.name)
195
      in
196
      reader.downstreams <- downstreams_from_list strict_readers ;
×
197
      strict_readers
198

199
    let two reader =
200
      match n reader 2 with [ a; b ] -> (a, b) | _ -> failwith "unexpected"
×
201

202
    let three reader =
203
      match n reader 3 with
×
204
      | [ a; b; c ] ->
×
205
          (a, b, c)
206
      | _ ->
×
207
          failwith "unexpected"
208
  end
209

210
  let rec close_downstreams = function
211
    | [] ->
20✔
212
        ()
213
    (* The use of close_read is justified, because close_read would do
214
     * everything close does, and in addition:
215
     * 1. all pending flushes become determined with `Reader_closed.
216
     * 2. the pipe buffer is cleared.
217
     * 3. all subsequent reads will get `Eof. *)
218
    | r :: rs ->
×
219
        Pipe.close_read r.reader ; close_downstreams rs
×
220
end
221

222
module Writer = struct
223
  type ('t, 'type_, 'write_return) t =
224
    { type_ : ('t, 'type_, 'write_return) type_
225
    ; strict_reader : 't Reader0.t
226
    ; writer : 't Pipe.Writer.t
227
    ; warn_on_drop : bool
228
    ; name : string option
229
    }
230

231
  (* TODO: See #1281 *)
232
  let to_linear_pipe { writer = pipe; _ } = pipe
×
233

234
  let handle_buffered_write :
235
      type type_ return.
236
         ('t, type_, return) t
237
      -> 't
238
      -> capacity:int
239
      -> on_overflow:(unit -> return)
240
      -> normal_return:return
241
      -> return =
242
   fun t data ~capacity ~on_overflow ~normal_return ->
243
    if Pipe.length t.strict_reader.reader > capacity then on_overflow ()
×
244
    else (
200✔
245
      Pipe.write_without_pushback t.writer data ;
246
      normal_return )
200✔
247

248
  let write : type type_ return. ('t, type_, return) t -> 't -> return =
249
   fun writer data ->
250
    ( if Pipe.is_closed writer.writer then
4,237✔
251
      let logger = Logger.create () in
×
252
      [%log warn] "writing to closed pipe $name"
×
253
        ~metadata:
254
          [ ( "name"
255
            , `String
256
                (Sexplib.Sexp.to_string ([%sexp_of: string option] writer.name))
×
257
            )
258
          ] ) ;
259
    match writer.type_ with
4,237✔
260
    | Synchronous ->
4,037✔
261
        Pipe.write writer.writer data
262
    | Buffered (`Capacity capacity, `Overflow Crash) ->
200✔
263
        handle_buffered_write writer data ~capacity
264
          ~on_overflow:(fun () -> raise (Overflow (value_or_empty writer.name)))
×
265
          ~normal_return:()
266
    | Buffered (`Capacity capacity, `Overflow (Drop_head f)) ->
×
267
        handle_buffered_write writer data ~capacity
268
          ~on_overflow:(fun () ->
269
            let logger = Logger.create () in
×
270
            let my_name = Option.value writer.name ~default:"<unnamed>" in
×
271
            if writer.warn_on_drop then
×
272
              [%log warn]
×
273
                ~metadata:[ ("pipe_name", `String my_name) ]
274
                "Dropping message on pipe $pipe_name" ;
275
            ( match Pipe.read_now writer.strict_reader.reader with
×
276
            | `Ok head ->
×
277
                f head
×
278
            | _ ->
×
279
                () ) ;
280
            Pipe.write_without_pushback writer.writer data )
281
          ~normal_return:()
282
    | Buffered (`Capacity capacity, `Overflow (Call f)) ->
×
283
        handle_buffered_write writer data ~capacity
284
          ~on_overflow:(fun () -> Some (f data))
×
285
          ~normal_return:None
286

287
  let close { strict_reader; writer; _ } =
288
    Pipe.close writer ;
20✔
289
    Reader0.close_downstreams strict_reader.downstreams
20✔
290

291
  let kill { strict_reader; writer; _ } =
292
    Pipe.clear strict_reader.reader ;
×
293
    Pipe.close writer ;
×
294
    Reader0.close_downstreams strict_reader.downstreams
×
295

296
  let is_closed { writer; _ } = Pipe.is_closed writer
×
297

298
  let pipe_name : type type_ return. ('t, type_, return) t -> string option =
299
   fun writer -> writer.name
×
300
end
301

302
let create ?name ?(warn_on_drop = true) type_ =
32✔
303
  let reader, writer = Pipe.create () in
32✔
304
  let strict_reader =
32✔
305
    Reader0.{ reader; has_reader = false; downstreams = []; name }
306
  in
307
  let strict_writer =
308
    Writer.{ type_; strict_reader; warn_on_drop; writer; name }
309
  in
310
  (strict_reader, strict_writer)
311

312
let transfer reader Writer.{ strict_reader; writer; _ } ~f =
313
  Reader0.(reader.downstreams <- [ strict_reader ]) ;
×
314
  Reader0.enforce_single_reader reader (Pipe.transfer reader.reader writer ~f)
×
315

316
let rec transfer_while_writer_alive reader writer ~f =
317
  if Pipe.is_closed writer.Writer.writer then Deferred.unit
×
318
  else
319
    match%bind Pipe.read reader.Reader0.reader with
×
320
    | `Ok x ->
×
321
        let%bind () = Pipe.write_if_open writer.Writer.writer (f x) in
×
322
        transfer_while_writer_alive reader writer ~f
×
323
    | `Eof ->
×
324
        Pipe.close_read writer.Writer.strict_reader.reader ;
325
        Deferred.unit
×
326

327
module Reader = struct
328
  include Reader0
329

330
  let partition_map3 reader ~f =
331
    let (reader_a, writer_a), (reader_b, writer_b), (reader_c, writer_c) =
×
332
      (create Synchronous, create Synchronous, create Synchronous)
×
333
    in
334
    don't_wait_for
335
      (Reader0.iter reader ~f:(fun x ->
×
336
           match f x with
×
337
           | `Fst x ->
×
338
               Writer.write writer_a x
339
           | `Snd x ->
×
340
               Writer.write writer_b x
341
           | `Trd x ->
×
342
               Writer.write writer_c x ) ) ;
343
    don't_wait_for
×
344
      (let%map () = Pipe.closed reader_a.reader
×
345
       and () = Pipe.closed reader_b.reader
×
346
       and () = Pipe.closed reader_c.reader in
×
347
       Pipe.close_read reader.reader ) ;
×
348
    reader.downstreams <- [ reader_a; reader_b; reader_c ] ;
×
349
    (reader_a, reader_b, reader_c)
350
end
351

352
let%test_module "Strict_pipe.Reader.Merge" =
353
  ( module struct
354
    let%test_unit "'iter' would filter out the closed pipes" =
355
      Run_in_thread.block_on_async_exn (fun () ->
×
356
          let reader1, writer1 =
×
357
            create (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
358
          in
359
          let reader2, writer2 =
×
360
            create (Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
361
          in
362
          Reader.Merge.iter [ reader1; reader2 ] ~f:(fun _ -> Deferred.unit)
×
363
          |> don't_wait_for ;
364
          Writer.write writer1 1 ;
×
365
          Writer.write writer2 2 ;
×
366
          Writer.close writer1 ;
×
367
          let%map () = after (Time_ns.Span.of_ms 5.) in
×
368
          Writer.write writer2 3 ; () )
×
369
  end )
370

371
let%test_module "Strict_pipe.close" =
372
  ( module struct
373
    let%test_unit "'close' would close a writer" =
374
      let _, writer = create Synchronous in
×
375
      assert (not (Writer.is_closed writer)) ;
×
376
      Writer.close writer ;
377
      assert (Writer.is_closed writer)
×
378

379
    let%test_unit "'close' would close a writer" =
380
      let _, writer = create (Buffered (`Capacity 64, `Overflow Crash)) in
×
381
      assert (not (Writer.is_closed writer)) ;
×
382
      Writer.close writer ;
383
      assert (Writer.is_closed writer)
×
384

385
    let%test_unit "'close' would close the downstream pipes linked by 'map'" =
386
      let input_reader, input_writer = create Synchronous in
×
387
      assert (not (Writer.is_closed input_writer)) ;
×
388
      let output_reader = Reader.map ~f:Fn.id input_reader in
389
      assert (not (Reader.is_closed output_reader)) ;
×
390
      Writer.close input_writer ;
391
      assert (Writer.is_closed input_writer) ;
×
392
      assert (Reader.is_closed output_reader)
×
393

394
    let%test_unit "'close' would close the downstream pipes linked by \
395
                   'filter_map'" =
396
      let input_reader, input_writer = create Synchronous in
×
397
      assert (not (Writer.is_closed input_writer)) ;
×
398
      let output_reader =
399
        Reader.filter_map ~f:(Fn.const (Some 1)) input_reader
×
400
      in
401
      assert (not (Reader.is_closed output_reader)) ;
×
402
      Writer.close input_writer ;
403
      assert (Writer.is_closed input_writer) ;
×
404
      assert (Reader.is_closed output_reader)
×
405

406
    let%test_unit "'close' would close the downstream pipes linked by 'Fork'" =
407
      let input_reader, input_writer = create Synchronous in
×
408
      assert (not (Writer.is_closed input_writer)) ;
×
409
      let output_reader1, output_reader2 = Reader.Fork.two input_reader in
410
      assert (not (Reader.is_closed output_reader1)) ;
×
411
      assert (not (Reader.is_closed output_reader2)) ;
×
412
      Writer.close input_writer ;
413
      assert (Writer.is_closed input_writer) ;
×
414
      assert (Reader.is_closed output_reader1) ;
×
415
      assert (Reader.is_closed output_reader2)
×
416

417
    let%test_unit "'close' would close the downstream pipes linked by \
418
                   'partition_map3'" =
419
      let input_reader, input_writer = create Synchronous in
×
420
      assert (not (Writer.is_closed input_writer)) ;
×
421
      let output_reader1, output_reader2, output_reader3 =
422
        Reader.partition_map3 input_reader ~f:(fun _ -> `Fst 1)
×
423
      in
424
      assert (not (Reader.is_closed output_reader1)) ;
×
425
      assert (not (Reader.is_closed output_reader2)) ;
×
426
      assert (not (Reader.is_closed output_reader3)) ;
×
427
      Writer.close input_writer ;
428
      assert (Writer.is_closed input_writer) ;
×
429
      assert (Reader.is_closed output_reader1) ;
×
430
      assert (Reader.is_closed output_reader2) ;
×
431
      assert (Reader.is_closed output_reader3)
×
432

433
    let%test_unit "'close' would close the downstream pipes linked by \
434
                   'transfer'" =
435
      let input_reader, input_writer = create Synchronous
×
436
      and _, output_writer = create Synchronous in
×
437
      assert (not (Writer.is_closed input_writer)) ;
×
438
      assert (not (Writer.is_closed output_writer)) ;
×
439
      let (_ : unit Deferred.t) =
440
        transfer input_reader output_writer ~f:Fn.id
441
      in
442
      Writer.close input_writer ;
×
443
      assert (Writer.is_closed input_writer) ;
×
444
      assert (Writer.is_closed output_writer)
×
445
  end )
14✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc