• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2863

05 Nov 2024 06:20PM UTC coverage: 30.754% (-16.6%) from 47.311%
2863

push

buildkite

web-flow
Merge pull request #16296 from MinaProtocol/dkijania/more_multi_jobs

more multi jobs in CI

20276 of 65930 relevant lines covered (30.75%)

8631.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.5
/src/lib/pipe_lib/linear_pipe.ml
1
open Core_kernel
7✔
2
open Async_kernel
3
module Writer = Pipe.Writer
4

5
module Reader = struct
6
  type 'a t = { pipe : 'a Pipe.Reader.t; mutable has_reader : bool }
7
end
8

9
let create () =
10
  let r, w = Pipe.create () in
×
11
  ({ Reader.pipe = r; has_reader = false }, w)
×
12

13
let wrap_reader reader = { Reader.pipe = reader; has_reader = false }
×
14

15
let force_write_maybe_drop_head ~capacity writer reader x =
16
  if Pipe.length reader.Reader.pipe > capacity then
×
17
    ignore
×
18
      ( Pipe.read_now reader.Reader.pipe
×
19
        : [ `Eof | `Nothing_available | `Ok of 'a ] ) ;
20
  Pipe.write_without_pushback writer x
×
21

22
let create_reader ~close_on_exception f =
23
  let r = Pipe.create_reader ~close_on_exception f in
×
24
  { Reader.pipe = r; has_reader = false }
×
25

26
let write w x =
27
  ( if Pipe.is_closed w then
×
28
    let logger = Logger.create () in
×
29
    [%log warn] "writing to closed linear pipe" ~metadata:[] ) ;
×
30
  Pipe.write w x
×
31

32
let write_if_open = Pipe.write_if_open
33

34
let write_without_pushback = Pipe.write_without_pushback
35

36
let write_without_pushback_if_open = Pipe.write_without_pushback_if_open
37

38
exception Overflow
39

40
let write_or_exn ~capacity writer reader x =
41
  if Pipe.length reader.Reader.pipe > capacity then raise Overflow
×
42
  else Pipe.write_without_pushback writer x
×
43

44
let close_read (reader : 'a Reader.t) = Pipe.close_read reader.pipe
×
45

46
let close = Pipe.close
47

48
let closed (reader : 'a Reader.t) = Pipe.closed reader.pipe
×
49

50
let multiple_reads_error () =
51
  failwith
×
52
    "Linear_pipe.bracket: the same reader has been used multiple times. If you \
53
     want to rebroadcast the reader, use fork"
54

55
let bracket (reader : 'a Reader.t) dx =
56
  if reader.has_reader then multiple_reads_error ()
×
57
  else (
×
58
    reader.has_reader <- true ;
59
    let%map x = dx in
60
    reader.has_reader <- false ;
×
61
    x )
62

63
let set_has_reader (reader : 'a Reader.t) =
64
  if reader.has_reader then multiple_reads_error ()
×
65
  else reader.has_reader <- true
×
66

67
let iter ?flushed ?continue_on_error reader ~f =
68
  bracket reader (Pipe.iter reader.Reader.pipe ?flushed ?continue_on_error ~f)
×
69

70
let iter_unordered ?consumer ~max_concurrency reader ~f =
71
  bracket reader
×
72
    (let rec run_reader () =
73
       match%bind Pipe.read ?consumer reader.Reader.pipe with
×
74
       | `Eof ->
×
75
           return ()
76
       | `Ok v ->
×
77
           let%bind () = f v in
×
78
           run_reader ()
×
79
     in
80
     Deferred.all_unit (List.init max_concurrency ~f:(fun _ -> run_reader ()))
×
81
    )
82

83
let drain r = iter r ~f:(fun _ -> Deferred.unit)
×
84

85
let length reader = Pipe.length reader.Reader.pipe
×
86

87
let of_list xs =
88
  let reader = wrap_reader (Pipe.of_list xs) in
×
89
  reader
×
90

91
let to_list reader = Pipe.to_list reader.Reader.pipe
×
92

93
let fold reader ~init ~f =
94
  bracket reader (Pipe.fold reader.Reader.pipe ~init ~f)
×
95

96
(* Adapted from Async_kernel's fold impl *)
97
let scan reader ~init ~f =
98
  set_has_reader reader ;
×
99
  let r, w = Pipe.create () in
×
100
  let rec loop b =
×
101
    match Pipe.read_now reader.Reader.pipe with
×
102
    | `Eof ->
×
103
        return (Pipe.close w)
×
104
    | `Ok v ->
×
105
        let%bind next = f b v in
×
106
        let%bind () = Pipe.write w next in
×
107
        loop next
×
108
    | `Nothing_available ->
×
109
        let%bind _ = Pipe.values_available reader.Reader.pipe in
×
110
        loop b
×
111
  in
112
  don't_wait_for
113
    ( (* Force async ala https://github.com/janestreet/async_kernel/blob/master/src/pipe.ml#L703 *)
114
      return ()
×
115
    >>= fun () -> loop init ) ;
×
116
  wrap_reader r
×
117

118
let map (reader : 'a Reader.t) ~f =
119
  set_has_reader reader ;
×
120
  wrap_reader (Pipe.map reader.Reader.pipe ~f)
×
121

122
let filter_map (reader : 'a Reader.t) ~f =
123
  set_has_reader reader ;
×
124
  wrap_reader (Pipe.filter_map reader.Reader.pipe ~f)
×
125

126
let transfer reader writer ~f =
127
  bracket reader (Pipe.transfer reader.Reader.pipe writer ~f)
×
128

129
let transfer_id reader writer =
130
  bracket reader (Pipe.transfer_id reader.Reader.pipe writer)
×
131

132
let merge_unordered rs =
133
  let merged_reader, merged_writer = create () in
×
134
  List.iter rs ~f:(fun reader ->
×
135
      don't_wait_for (iter reader ~f:(fun x -> Pipe.write merged_writer x)) ) ;
×
136
  don't_wait_for
×
137
    (let%map () = Deferred.List.iter rs ~f:closed in
×
138
     Pipe.close merged_writer ) ;
×
139
  merged_reader
×
140

141
(* TODO following are all more efficient with iter',
142
 * but I get write' doesn't exist on my version of ocaml *)
143

144
let fork reader n =
145
  let pipes = List.init n ~f:(fun _ -> create ()) in
×
146
  let writers = List.map pipes ~f:(fun (_, w) -> w) in
×
147
  let readers = List.map pipes ~f:(fun (r, _) -> r) in
×
148
  don't_wait_for
×
149
    (iter reader ~f:(fun x ->
×
150
         Deferred.List.iter writers ~f:(fun writer ->
×
151
             if not (Pipe.is_closed writer) then Pipe.write writer x
×
152
             else return () ) ) ) ;
×
153
  don't_wait_for
×
154
    (let%map () = Deferred.List.iter readers ~f:closed in
×
155
     close_read reader ) ;
×
156
  readers
×
157

158
let fork2 reader =
159
  match fork reader 2 with [ x; y ] -> (x, y) | _ -> assert false
×
160

161
let fork3 reader =
162
  match fork reader 3 with [ x; y; z ] -> (x, y, z) | _ -> assert false
×
163

164
let fork4 reader =
165
  match fork reader 4 with [ x; y; z; w ] -> (x, y, z, w) | _ -> assert false
×
166

167
let fork5 reader =
168
  match fork reader 5 with
×
169
  | [ x; y; z; w; v ] ->
×
170
      (x, y, z, w, v)
171
  | _ ->
172
      assert false
173

174
let fork6 reader =
175
  match fork reader 6 with
×
176
  | [ x; y; z; w; v; u ] ->
×
177
      (x, y, z, w, v, u)
178
  | _ ->
179
      assert false
180

181
let partition_map2 reader ~f =
182
  let (reader_a, writer_a), (reader_b, writer_b) = (create (), create ()) in
×
183
  don't_wait_for
184
    (iter reader ~f:(fun x ->
×
185
         match f x with
×
186
         | `Fst x ->
×
187
             Pipe.write writer_a x
188
         | `Snd x ->
×
189
             Pipe.write writer_b x ) ) ;
190
  don't_wait_for
×
191
    (let%map () = closed reader_a and () = closed reader_b in
×
192
     close_read reader ) ;
×
193
  (reader_a, reader_b)
×
194

195
let partition_map3 reader ~f =
196
  let (reader_a, writer_a), (reader_b, writer_b), (reader_c, writer_c) =
×
197
    (create (), create (), create ())
×
198
  in
199
  don't_wait_for
200
    (iter reader ~f:(fun x ->
×
201
         match f x with
×
202
         | `Fst x ->
×
203
             Pipe.write writer_a x
204
         | `Snd x ->
×
205
             Pipe.write writer_b x
206
         | `Trd x ->
×
207
             Pipe.write writer_c x ) ) ;
208
  don't_wait_for
×
209
    (let%map () = closed reader_a
×
210
     and () = closed reader_b
×
211
     and () = closed reader_c in
×
212
     close_read reader ) ;
×
213
  (reader_a, reader_b, reader_c)
×
214

215
let filter_map_unordered ~max_concurrency t ~f =
216
  let reader, writer = create () in
×
217
  don't_wait_for
×
218
    (iter_unordered ~max_concurrency t ~f:(fun x ->
×
219
         match%bind f x with Some y -> Pipe.write writer y | None -> return () )
×
220
    ) ;
221
  don't_wait_for
×
222
    (let%map () = closed reader in
×
223
     close_read t ) ;
×
224
  reader
×
225

226
let latest_ref t ~initial =
227
  let cell = ref initial in
×
228
  don't_wait_for (iter t ~f:(fun a -> return (cell := a))) ;
×
229
  cell
×
230

231
let values_available ({ pipe; _ } : 'a Reader.t) = Pipe.values_available pipe
×
232

233
let peek ({ pipe; _ } : 'a Reader.t) = Pipe.peek pipe
×
234

235
let release_has_reader (reader : 'a Reader.t) =
236
  if not reader.has_reader then
×
237
    failwith "Linear_pipe.bracket: did not have reader"
×
238
  else reader.has_reader <- false
×
239

240
let read_now reader =
241
  set_has_reader reader ;
×
242
  let res = Pipe.read_now reader.pipe in
×
243
  release_has_reader reader ; res
×
244

245
let read' ?max_queue_length ({ pipe; _ } : 'a Reader.t) =
246
  Pipe.read' ?max_queue_length pipe
×
247

248
let read ({ pipe; _ } : 'a Reader.t) = Pipe.read pipe
×
249

250
let read_exn reader =
251
  match%map read reader with
×
252
  | `Eof ->
×
253
      failwith "Expecting a value from reader"
254
  | `Ok value ->
×
255
      value
14✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc