• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2863

05 Nov 2024 06:20PM UTC coverage: 30.754% (-16.6%) from 47.311%
2863

push

buildkite

web-flow
Merge pull request #16296 from MinaProtocol/dkijania/more_multi_jobs

more multi jobs in CI

20276 of 65930 relevant lines covered (30.75%)

8631.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.47
/src/lib/syncable_ledger/syncable_ledger.ml
1
open Core_kernel
7✔
2
open Async_kernel
3
open Pipe_lib
4
open Network_peer
5

6
type Structured_log_events.t += Snarked_ledger_synced
7
  [@@deriving register_event { msg = "Snarked database sync'd. All done" }]
×
8

9
(** Run f recursively n times, starting with value r.
10
    e.g. funpow 3 f r = f (f (f r)) *)
11
let rec funpow n f r = if n > 0 then funpow (n - 1) f (f r) else r
×
12

13
module Query = struct
14
  [%%versioned
15
  module Stable = struct
16
    module V1 = struct
17
      type 'addr t =
7✔
18
        | What_child_hashes of 'addr
×
19
            (** What are the hashes of the children of this address? *)
20
        | What_contents of 'addr
×
21
            (** What accounts are at this address? addr must have depth
22
            tree_depth - account_subtree_height *)
23
        | Num_accounts
×
24
            (** How many accounts are there? Used to size data structure and
25
            figure out what part of the tree is filled in. *)
26
      [@@deriving sexp, yojson, hash, compare]
21✔
27
    end
28
  end]
29
end
30

31
module Answer = struct
32
  [%%versioned
33
  module Stable = struct
34
    module V1 = struct
35
      type ('hash, 'account) t =
7✔
36
        | Child_hashes_are of 'hash * 'hash
×
37
            (** The requested address's children have these hashes **)
38
        | Contents_are of 'account list
×
39
            (** The requested address has these accounts *)
40
        | Num_accounts of int * 'hash
×
41
            (** There are this many accounts and the smallest subtree that
42
                contains all non-empty nodes has this hash. *)
43
      [@@deriving sexp, yojson]
21✔
44

45
      let to_latest acct_to_latest = function
46
        | Child_hashes_are (h1, h2) ->
×
47
            Child_hashes_are (h1, h2)
48
        | Contents_are accts ->
×
49
            Contents_are (List.map ~f:acct_to_latest accts)
×
50
        | Num_accounts (i, h) ->
×
51
            Num_accounts (i, h)
52
    end
53
  end]
54
end
55

56
module type Inputs_intf = sig
57
  module Addr : module type of Merkle_address
58

59
  module Account : sig
60
    type t [@@deriving bin_io, sexp, yojson]
61
  end
62

63
  module Hash : Merkle_ledger.Intf.Hash with type account := Account.t
64

65
  module Root_hash : sig
66
    type t [@@deriving equal, sexp, yojson]
67

68
    val to_hash : t -> Hash.t
69
  end
70

71
  module MT :
72
    Merkle_ledger.Intf.SYNCABLE
73
      with type hash := Hash.t
74
       and type root_hash := Root_hash.t
75
       and type addr := Addr.t
76
       and type account := Account.t
77

78
  val account_subtree_height : int
79
end
80

81
module type S = sig
82
  type 'a t [@@deriving sexp]
83

84
  type merkle_tree
85

86
  type merkle_path
87

88
  type hash
89

90
  type root_hash
91

92
  type addr
93

94
  type diff
95

96
  type account
97

98
  type index = int
99

100
  type query
101

102
  type answer
103

104
  module Responder : sig
105
    type t
106

107
    val create :
108
         merkle_tree
109
      -> (query -> unit)
110
      -> logger:Logger.t
111
      -> trust_system:Trust_system.t
112
      -> t
113

114
    val answer_query :
115
      t -> query Envelope.Incoming.t -> answer option Deferred.t
116
  end
117

118
  val create :
119
    merkle_tree -> logger:Logger.t -> trust_system:Trust_system.t -> 'a t
120

121
  val answer_writer :
122
       'a t
123
    -> (root_hash * query * answer Envelope.Incoming.t) Linear_pipe.Writer.t
124

125
  val query_reader : 'a t -> (root_hash * query) Linear_pipe.Reader.t
126

127
  val destroy : 'a t -> unit
128

129
  val new_goal :
130
       'a t
131
    -> root_hash
132
    -> data:'a
133
    -> equal:('a -> 'a -> bool)
134
    -> [ `Repeat | `New | `Update_data ]
135

136
  val peek_valid_tree : 'a t -> merkle_tree option
137

138
  val valid_tree : 'a t -> (merkle_tree * 'a) Deferred.t
139

140
  val wait_until_valid :
141
       'a t
142
    -> root_hash
143
    -> [ `Ok of merkle_tree | `Target_changed of root_hash option * root_hash ]
144
       Deferred.t
145

146
  val fetch :
147
       'a t
148
    -> root_hash
149
    -> data:'a
150
    -> equal:('a -> 'a -> bool)
151
    -> [ `Ok of merkle_tree | `Target_changed of root_hash option * root_hash ]
152
       Deferred.t
153

154
  val apply_or_queue_diff : 'a t -> diff -> unit
155

156
  val merkle_path_at_addr : 'a t -> addr -> merkle_path Or_error.t
157

158
  val get_account_at_addr : 'a t -> addr -> account Or_error.t
159
end
160

161
(*
162

163
Every node of the merkle tree is always in one of three states:
164

165
- Fresh.
166
  The current contents for this node in the MT match what we
167
  expect.
168
- Stale
169
  The current contents for this node in the MT do _not_ match
170
  what we expect.
171
- Unknown.
172
  We don't know what to expect yet.
173

174

175
Although every node conceptually has one of these states, and can
176
make a transition at any time, the syncer operates only along a
177
"frontier" of the tree, which consists of the deepest Stale nodes.
178

179
The goal of the ledger syncer is to make the root node be fresh,
180
starting from it being stale.
181

182
The syncer usually operates exclusively on these frontier nodes
183
and their direct children. However, the goal hash can change
184
while the syncer is running, and at that point every non-root node
185
conceptually becomes Unknown, and we need to restart. However, we
186
don't need to restart completely: in practice, only small portions
187
of the merkle tree change between goals, and we can re-use the "Stale"
188
nodes we already have if the expected hash doesn't change.
189

190
*)
191
(*
192
Note: while syncing, the underlying ledger is in an
193
indeterminate state. We're mutating hashes at internal
194
nodes without updating their children. In fact, we
195
don't even set all the hashes for the internal nodes!
196
(When we hit a height=N subtree, we don't do anything
197
with the hashes in the bottomost N-1 internal nodes).
198
*)
199

200
module Make (Inputs : Inputs_intf) : sig
201
  open Inputs
202

203
  include
204
    S
205
      with type merkle_tree := MT.t
206
       and type hash := Hash.t
207
       and type root_hash := Root_hash.t
208
       and type addr := Addr.t
209
       and type merkle_path := MT.path
210
       and type account := Account.t
211
       and type query := Addr.t Query.t
212
       and type answer := (Hash.t, Account.t) Answer.t
213
end = struct
214
  open Inputs
215

216
  type diff = unit
217

218
  type index = int
219

220
  type answer = (Hash.t, Account.t) Answer.t
221

222
  type query = Addr.t Query.t
223

224
  module Responder = struct
225
    type t =
226
      { mt : MT.t
227
      ; f : query -> unit
228
      ; logger : Logger.t
229
      ; trust_system : Trust_system.t
230
      }
231

232
    let create :
233
           MT.t
234
        -> (query -> unit)
235
        -> logger:Logger.t
236
        -> trust_system:Trust_system.t
237
        -> t =
238
     fun mt f ~logger ~trust_system -> { mt; f; logger; trust_system }
×
239

240
    let answer_query :
241
        t -> query Envelope.Incoming.t -> answer option Deferred.t =
242
     fun { mt; f; logger; trust_system } query_envelope ->
243
      let open Trust_system in
×
244
      let ledger_depth = MT.depth mt in
245
      let sender = Envelope.Incoming.sender query_envelope in
×
246
      let query = Envelope.Incoming.data query_envelope in
×
247
      f query ;
×
248
      let response_or_punish =
×
249
        match query with
250
        | What_child_hashes a -> (
×
251
            match
252
              let open Or_error.Let_syntax in
253
              let%bind lchild = Addr.child ~ledger_depth a Direction.Left in
×
254
              let%bind rchild = Addr.child ~ledger_depth a Direction.Right in
×
255
              Or_error.try_with (fun () ->
×
256
                  Answer.Child_hashes_are
×
257
                    ( MT.get_inner_hash_at_addr_exn mt lchild
×
258
                    , MT.get_inner_hash_at_addr_exn mt rchild ) )
×
259
            with
260
            | Ok answer ->
×
261
                Either.First answer
262
            | Error e ->
×
263
                let logger = Logger.create () in
264
                [%log error]
×
265
                  ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
266
                  "When handling What_child_hashes request, the following \
267
                   error happended: $error" ;
268
                Either.Second
×
269
                  ( Actions.Violated_protocol
270
                  , Some
271
                      ( "invalid address $addr in What_child_hashes request"
272
                      , [ ("addr", Addr.to_yojson a) ] ) ) )
×
273
        | What_contents a ->
×
274
            if Addr.height ~ledger_depth a > account_subtree_height then
×
275
              Either.Second
×
276
                ( Actions.Violated_protocol
277
                , Some
278
                    ( "requested too big of a subtree at once: $addr"
279
                    , [ ("addr", Addr.to_yojson a) ] ) )
×
280
            else
281
              let addresses_and_accounts =
×
282
                List.sort ~compare:(fun (addr1, _) (addr2, _) ->
283
                    Addr.compare addr1 addr2 )
×
284
                @@ MT.get_all_accounts_rooted_at_exn mt a
×
285
                (* can't actually throw *)
286
              in
287
              let addresses, accounts = List.unzip addresses_and_accounts in
×
288
              if List.is_empty addresses then
×
289
                (* Peer should know what portions of the tree are full from the
290
                   Num_accounts query. *)
291
                Either.Second
×
292
                  ( Actions.Violated_protocol
293
                  , Some
294
                      ( "Requested empty subtree: $addr"
295
                      , [ ("addr", Addr.to_yojson a) ] ) )
×
296
              else
297
                let first_address, rest_address =
×
298
                  (List.hd_exn addresses, List.tl_exn addresses)
×
299
                in
300
                let missing_address, is_compact =
301
                  List.fold rest_address
302
                    ~init:(Addr.next first_address, true)
×
303
                    ~f:(fun (expected_address, is_compact) actual_address ->
304
                      if
×
305
                        is_compact
306
                        && [%equal: Addr.t option] expected_address
×
307
                             (Some actual_address)
308
                      then (Addr.next actual_address, true)
×
309
                      else (expected_address, false) )
×
310
                in
311
                if not is_compact then (
×
312
                  (* indicates our ledger is invalid somehow. *)
313
                  [%log fatal]
×
314
                    ~metadata:
315
                      [ ( "missing_address"
316
                        , Addr.to_yojson (Option.value_exn missing_address) )
×
317
                      ; ( "addresses_and_accounts"
318
                        , `List
319
                            (List.map addresses_and_accounts
×
320
                               ~f:(fun (addr, account) ->
321
                                 `Tuple
×
322
                                   [ Addr.to_yojson addr
×
323
                                   ; Account.to_yojson account
×
324
                                   ] ) ) )
325
                      ]
326
                    "Missing an account at address: $missing_address inside \
327
                     the list: $addresses_and_accounts" ;
328
                  assert false )
×
329
                else Either.First (Answer.Contents_are accounts)
×
330
        | Num_accounts ->
×
331
            let len = MT.num_accounts mt in
332
            let height = Int.ceil_log2 len in
×
333
            (* FIXME: bug when height=0 https://github.com/o1-labs/nanobit/issues/365 *)
334
            let content_root_addr =
×
335
              funpow
336
                (MT.depth mt - height)
×
337
                (fun a -> Addr.child_exn ~ledger_depth a Direction.Left)
×
338
                (Addr.root ())
×
339
            in
340
            Either.First
×
341
              (Num_accounts
342
                 (len, MT.get_inner_hash_at_addr_exn mt content_root_addr) )
×
343
      in
344
      match response_or_punish with
345
      | Either.First answer ->
×
346
          Deferred.return @@ Some answer
347
      | Either.Second action ->
×
348
          let%map _ =
349
            record_envelope_sender trust_system logger sender action
×
350
          in
351
          None
×
352
  end
353

354
  type 'a t =
355
    { mutable desired_root : Root_hash.t option
356
    ; mutable auxiliary_data : 'a option
357
    ; tree : MT.t
358
    ; logger : Logger.t
359
    ; trust_system : Trust_system.t
360
    ; answers :
361
        (Root_hash.t * query * answer Envelope.Incoming.t) Linear_pipe.Reader.t
362
    ; answer_writer :
363
        (Root_hash.t * query * answer Envelope.Incoming.t) Linear_pipe.Writer.t
364
    ; queries : (Root_hash.t * query) Linear_pipe.Writer.t
365
    ; query_reader : (Root_hash.t * query) Linear_pipe.Reader.t
366
    ; waiting_parents : Hash.t Addr.Table.t
367
          (** Addresses we are waiting for the children of, and the expected
368
              hash of the node with the address. *)
369
    ; waiting_content : Hash.t Addr.Table.t
370
    ; mutable validity_listener :
371
        [ `Ok | `Target_changed of Root_hash.t option * Root_hash.t ] Ivar.t
372
    }
373

374
  let t_of_sexp _ = failwith "t_of_sexp: not implemented"
×
375

376
  let sexp_of_t _ = failwith "sexp_of_t: not implemented"
×
377

378
  let desired_root_exn { desired_root; _ } = desired_root |> Option.value_exn
×
379

380
  let destroy t =
381
    Linear_pipe.close_read t.answers ;
×
382
    Linear_pipe.close_read t.query_reader
×
383

384
  let answer_writer t = t.answer_writer
×
385

386
  let query_reader t = t.query_reader
×
387

388
  let expect_children : 'a t -> Addr.t -> Hash.t -> unit =
389
   fun t parent_addr expected ->
390
    [%log' trace t.logger]
×
391
      ~metadata:
392
        [ ("parent_address", Addr.to_yojson parent_addr)
×
393
        ; ("hash", Hash.to_yojson expected)
×
394
        ]
395
      "Expecting children parent $parent_address, expected: $hash" ;
396
    Addr.Table.add_exn t.waiting_parents ~key:parent_addr ~data:expected
×
397

398
  let expect_content : 'a t -> Addr.t -> Hash.t -> unit =
399
   fun t addr expected ->
400
    [%log' trace t.logger]
×
401
      ~metadata:
402
        [ ("address", Addr.to_yojson addr); ("hash", Hash.to_yojson expected) ]
×
403
      "Expecting content addr $address, expected: $hash" ;
404
    Addr.Table.add_exn t.waiting_content ~key:addr ~data:expected
×
405

406
  (** Given an address and the accounts below that address, fill in the tree
407
      with them. *)
408
  let add_content :
409
         'a t
410
      -> Addr.t
411
      -> Account.t list
412
      -> [ `Success
413
         | `Hash_mismatch of Hash.t * Hash.t  (** expected hash, actual *) ] =
414
   fun t addr content ->
415
    let expected = Addr.Table.find_exn t.waiting_content addr in
×
416
    (* TODO #444 should we batch all the updates and do them at the end? *)
417
    (* We might write the wrong data to the underlying ledger here, but if so
418
       we'll requeue the address and it'll be overwritten. *)
419
    MT.set_all_accounts_rooted_at_exn t.tree addr content ;
×
420
    Addr.Table.remove t.waiting_content addr ;
×
421
    [%log' trace t.logger]
×
422
      ~metadata:
423
        [ ("address", Addr.to_yojson addr); ("hash", Hash.to_yojson expected) ]
×
424
      "Found content addr $address, with hash $hash, removing from waiting \
425
       content" ;
426
    let actual = MT.get_inner_hash_at_addr_exn t.tree addr in
×
427
    if Hash.equal actual expected then `Success
×
428
    else `Hash_mismatch (expected, actual)
×
429

430
  (** Given an address and the hashes of the children of the corresponding node,
431
      check the children hash to the expected value. If they do, queue the
432
      children for retrieval if the values in the underlying ledger don't match
433
      the hashes we got from the network. *)
434
  let add_child_hashes_to :
435
         'a t
436
      -> Addr.t
437
      -> Hash.t
438
      -> Hash.t
439
      -> [ `Good of (Addr.t * Hash.t) list
440
           (** The addresses and expected hashes of the now-retrievable children *)
441
         | `Hash_mismatch of Hash.t * Hash.t
442
           (** Hash check failed, peer lied. First parameter expected, second parameter actual. *)
443
         ] =
444
   fun t parent_addr lh rh ->
445
    let ledger_depth = MT.depth t.tree in
×
446
    let la, ra =
×
447
      Option.value_exn ~message:"Tried to fetch a leaf as if it was a node"
448
        ( Or_error.ok
×
449
        @@ Or_error.both
×
450
             (Addr.child ~ledger_depth parent_addr Direction.Left)
×
451
             (Addr.child ~ledger_depth parent_addr Direction.Right) )
×
452
    in
453
    let expected =
×
454
      Option.value_exn ~message:"Forgot to wait for a node"
455
        (Addr.Table.find t.waiting_parents parent_addr)
×
456
    in
457
    let merged_hash =
×
458
      (* Height here is the height of the things we're merging, so one less than
459
         the parent height. *)
460
      Hash.merge ~height:(ledger_depth - Addr.depth parent_addr - 1) lh rh
×
461
    in
462
    if Hash.equal merged_hash expected then (
×
463
      (* Fetch the children of a node if the hash in the underlying ledger
464
         doesn't match what we got. *)
465
      let should_fetch_children addr hash =
466
        not @@ Hash.equal (MT.get_inner_hash_at_addr_exn t.tree addr) hash
×
467
      in
468
      let subtrees_to_fetch =
469
        [ (la, lh); (ra, rh) ]
470
        |> List.filter ~f:(Tuple2.uncurry should_fetch_children)
×
471
      in
472
      Addr.Table.remove t.waiting_parents parent_addr ;
×
473
      `Good subtrees_to_fetch )
×
474
    else `Hash_mismatch (expected, merged_hash)
×
475

476
  let all_done t =
477
    if not (Root_hash.equal (MT.merkle_root t.tree) (desired_root_exn t)) then
×
478
      failwith "We finished syncing, but made a mistake somewhere :("
×
479
    else (
×
480
      if Ivar.is_full t.validity_listener then
481
        [%log' error t.logger] "Ivar.fill bug is here!" ;
×
482
      Ivar.fill t.validity_listener `Ok )
×
483

484
  (** Compute the hash of an empty tree of the specified height. *)
485
  let empty_hash_at_height h =
486
    let rec go prev ctr =
×
487
      if ctr = h then prev else go (Hash.merge ~height:ctr prev prev) (ctr + 1)
×
488
    in
489
    go Hash.empty_account 0
490

491
  (** Given the hash of the smallest subtree that contains all accounts, the
492
      height of that hash in the tree and the height of the whole tree, compute
493
      the hash of the whole tree. *)
494
  let complete_with_empties hash start_height result_height =
495
    let rec go cur_empty prev_hash height =
×
496
      if height = result_height then prev_hash
×
497
      else
498
        let cur = Hash.merge ~height prev_hash cur_empty in
×
499
        let next_empty = Hash.merge ~height cur_empty cur_empty in
×
500
        go next_empty cur (height + 1)
×
501
    in
502
    go (empty_hash_at_height start_height) hash start_height
×
503

504
  (** Given an address and the hash of the corresponding subtree, start getting
505
      the children.
506
  *)
507
  let handle_node t addr exp_hash =
508
    if Addr.depth addr >= MT.depth t.tree - account_subtree_height then (
×
509
      expect_content t addr exp_hash ;
510
      Linear_pipe.write_without_pushback_if_open t.queries
×
511
        (desired_root_exn t, What_contents addr) )
×
512
    else (
×
513
      expect_children t addr exp_hash ;
514
      Linear_pipe.write_without_pushback_if_open t.queries
×
515
        (desired_root_exn t, What_child_hashes addr) )
×
516

517
  (** Handle the initial Num_accounts message, starting the main syncing
518
      process. *)
519
  let handle_num_accounts :
520
      'a t -> int -> Hash.t -> [ `Success | `Hash_mismatch of Hash.t * Hash.t ]
521
      =
522
   fun t n content_hash ->
523
    let rh = Root_hash.to_hash (desired_root_exn t) in
×
524
    let height = Int.ceil_log2 n in
×
525
    (* FIXME: bug when height=0 https://github.com/o1-labs/nanobit/issues/365 *)
526
    let actual = complete_with_empties content_hash height (MT.depth t.tree) in
×
527
    if Hash.equal actual rh then (
×
528
      Addr.Table.clear t.waiting_parents ;
529
      (* We should use this information to set the empty account slots empty and
530
         start syncing at the content root. See #1972. *)
531
      Addr.Table.clear t.waiting_content ;
×
532
      handle_node t (Addr.root ()) rh ;
×
533
      `Success )
×
534
    else `Hash_mismatch (rh, actual)
×
535

536
  let main_loop t =
537
    let handle_answer :
×
538
           Root_hash.t
539
           * Addr.t Query.t
540
           * (Hash.t, Account.t) Answer.t Envelope.Incoming.t
541
        -> unit Deferred.t =
542
     fun (root_hash, query, env) ->
543
      (* NOTE: think about synchronization here. This is deferred now, so
544
         the t and the underlying ledger can change while processing is
545
         happening. *)
546
      let already_done =
×
547
        match Ivar.peek t.validity_listener with Some `Ok -> true | _ -> false
×
548
      in
549
      let sender = Envelope.Incoming.sender env in
550
      let answer = Envelope.Incoming.data env in
×
551
      [%log' trace t.logger]
×
552
        ~metadata:
553
          [ ("root_hash", Root_hash.to_yojson root_hash)
×
554
          ; ("query", Query.to_yojson Addr.to_yojson query)
×
555
          ]
556
        "Handle answer for $root_hash" ;
557
      if not (Root_hash.equal root_hash (desired_root_exn t)) then (
×
558
        [%log' trace t.logger]
×
559
          ~metadata:
560
            [ ("desired_hash", Root_hash.to_yojson (desired_root_exn t))
×
561
            ; ("ignored_hash", Root_hash.to_yojson root_hash)
×
562
            ]
563
          "My desired root was $desired_hash, so I'm ignoring $ignored_hash" ;
564
        Deferred.unit )
×
565
      else if already_done then (
×
566
        (* This can happen if we asked for hashes that turn out to be equal in
567
           underlying ledger and the target. *)
568
        [%log' debug t.logger]
×
569
          "Got sync response when we're already finished syncing" ;
570
        Deferred.unit )
×
571
      else
572
        let open Trust_system in
×
573
        (* If a peer misbehaves we still need the information we asked them for,
574
           so requeue in that case. *)
575
        let requeue_query () =
576
          Linear_pipe.write_without_pushback_if_open t.queries (root_hash, query)
×
577
        in
578
        let credit_fulfilled_request () =
579
          record_envelope_sender t.trust_system t.logger sender
×
580
            ( Actions.Fulfilled_request
581
            , Some
582
                ( "sync ledger query $query"
583
                , [ ("query", Query.to_yojson Addr.to_yojson query) ] ) )
×
584
        in
585
        let%bind _ =
586
          match (query, answer) with
587
          | Query.What_child_hashes addr, Answer.Child_hashes_are (lh, rh) -> (
×
588
              match add_child_hashes_to t addr lh rh with
589
              | `Hash_mismatch (expected, actual) ->
×
590
                  let%map () =
591
                    record_envelope_sender t.trust_system t.logger sender
×
592
                      ( Actions.Sent_bad_hash
593
                      , Some
594
                          ( "sent child hashes $lhash and $rhash for address \
595
                             $addr, they merge hash to $actualmerge but we \
596
                             expected $expectedmerge"
597
                          , [ ("lhash", Hash.to_yojson lh)
×
598
                            ; ("rhash", Hash.to_yojson rh)
×
599
                            ; ("actualmerge", Hash.to_yojson actual)
×
600
                            ; ("expectedmerge", Hash.to_yojson expected)
×
601
                            ] ) )
602
                  in
603
                  requeue_query ()
×
604
              | `Good children_to_verify ->
×
605
                  (* TODO #312: Make sure we don't write too much *)
606
                  List.iter children_to_verify ~f:(fun (addr, hash) ->
607
                      handle_node t addr hash ) ;
×
608
                  credit_fulfilled_request () )
×
609
          | Query.What_contents addr, Answer.Contents_are leaves -> (
×
610
              match add_content t addr leaves with
611
              | `Success ->
×
612
                  credit_fulfilled_request ()
×
613
              | `Hash_mismatch (expected, actual) ->
×
614
                  let%map () =
615
                    record_envelope_sender t.trust_system t.logger sender
×
616
                      ( Actions.Sent_bad_hash
617
                      , Some
618
                          ( "sent accounts $accounts for address $addr, they \
619
                             hash to $actual but we expected $expected"
620
                          , [ ( "accounts"
621
                              , `List (List.map ~f:Account.to_yojson leaves) )
×
622
                            ; ("addr", Addr.to_yojson addr)
×
623
                            ; ("actual", Hash.to_yojson actual)
×
624
                            ; ("expected", Hash.to_yojson expected)
×
625
                            ] ) )
626
                  in
627
                  requeue_query () )
×
628
          | Query.Num_accounts, Answer.Num_accounts (count, content_root) -> (
×
629
              match handle_num_accounts t count content_root with
630
              | `Success ->
×
631
                  credit_fulfilled_request ()
×
632
              | `Hash_mismatch (expected, actual) ->
×
633
                  let%map () =
634
                    record_envelope_sender t.trust_system t.logger sender
×
635
                      ( Actions.Sent_bad_hash
636
                      , Some
637
                          ( "Claimed num_accounts $count, content root hash \
638
                             $content_root_hash, that implies a root hash of \
639
                             $actual, we expected $expected"
640
                          , [ ("count", `Int count)
641
                            ; ("content_root_hash", Hash.to_yojson content_root)
×
642
                            ; ("actual", Hash.to_yojson actual)
×
643
                            ; ("expected", Hash.to_yojson expected)
×
644
                            ] ) )
645
                  in
646
                  requeue_query () )
×
647
          | query, answer ->
×
648
              let%map () =
649
                record_envelope_sender t.trust_system t.logger sender
×
650
                  ( Actions.Violated_protocol
651
                  , Some
652
                      ( "Answered question we didn't ask! Query was $query \
653
                         answer was $answer"
654
                      , [ ("query", Query.to_yojson Addr.to_yojson query)
×
655
                        ; ( "answer"
656
                          , Answer.to_yojson Hash.to_yojson Account.to_yojson
×
657
                              answer )
658
                        ] ) )
659
              in
660
              requeue_query ()
×
661
        in
662
        if
×
663
          Root_hash.equal
664
            (Option.value_exn t.desired_root)
×
665
            (MT.merkle_root t.tree)
×
666
        then (
×
667
          [%str_log' trace t.logger] Snarked_ledger_synced ;
×
668
          all_done t ) ;
×
669
        Deferred.unit
×
670
    in
671
    Linear_pipe.iter t.answers ~f:handle_answer
672

673
  let new_goal t h ~data ~equal =
674
    let should_skip =
×
675
      match t.desired_root with
676
      | None ->
×
677
          false
678
      | Some h' ->
×
679
          Root_hash.equal h h'
×
680
    in
681
    if not should_skip then (
×
682
      Option.iter t.desired_root ~f:(fun root_hash ->
683
          [%log' debug t.logger]
×
684
            ~metadata:
685
              [ ("old_root_hash", Root_hash.to_yojson root_hash)
×
686
              ; ("new_root_hash", Root_hash.to_yojson h)
×
687
              ]
688
            "New_goal: changing target from $old_root_hash to $new_root_hash" ) ;
689
      Ivar.fill_if_empty t.validity_listener
×
690
        (`Target_changed (t.desired_root, h)) ;
691
      t.validity_listener <- Ivar.create () ;
×
692
      t.desired_root <- Some h ;
693
      t.auxiliary_data <- Some data ;
694
      Linear_pipe.write_without_pushback_if_open t.queries (h, Num_accounts) ;
695
      `New )
×
696
    else if
×
697
      Option.fold t.auxiliary_data ~init:false ~f:(fun _ saved_data ->
698
          equal data saved_data )
×
699
    then (
×
700
      [%log' debug t.logger] "New_goal to same hash, not doing anything" ;
×
701
      `Repeat )
×
702
    else (
×
703
      t.auxiliary_data <- Some data ;
704
      `Update_data )
705

706
  let rec valid_tree t =
707
    match%bind Ivar.read t.validity_listener with
×
708
    | `Ok ->
×
709
        return (t.tree, Option.value_exn t.auxiliary_data)
×
710
    | `Target_changed _ ->
×
711
        valid_tree t
712

713
  let peek_valid_tree t =
714
    Option.bind (Ivar.peek t.validity_listener) ~f:(function
×
715
      | `Ok ->
×
716
          Some t.tree
717
      | `Target_changed _ ->
×
718
          None )
719

720
  let wait_until_valid t h =
721
    if not (Root_hash.equal h (desired_root_exn t)) then
×
722
      return (`Target_changed (t.desired_root, h))
×
723
    else
724
      Deferred.map (Ivar.read t.validity_listener) ~f:(function
×
725
        | `Target_changed payload ->
×
726
            `Target_changed payload
727
        | `Ok ->
×
728
            `Ok t.tree )
729

730
  let fetch t rh ~data ~equal =
731
    ignore (new_goal t rh ~data ~equal : [ `New | `Repeat | `Update_data ]) ;
×
732
    wait_until_valid t rh
733

734
  let create mt ~logger ~trust_system =
735
    let qr, qw = Linear_pipe.create () in
×
736
    let ar, aw = Linear_pipe.create () in
×
737
    let t =
×
738
      { desired_root = None
739
      ; auxiliary_data = None
740
      ; tree = mt
741
      ; logger
742
      ; trust_system
743
      ; answers = ar
744
      ; answer_writer = aw
745
      ; queries = qw
746
      ; query_reader = qr
747
      ; waiting_parents = Addr.Table.create ()
×
748
      ; waiting_content = Addr.Table.create ()
×
749
      ; validity_listener = Ivar.create ()
×
750
      }
751
    in
752
    don't_wait_for (main_loop t) ;
×
753
    t
×
754

755
  let apply_or_queue_diff _ _ =
756
    (* Need some interface for the diffs, not sure the layering is right here. *)
757
    failwith "todo"
×
758

759
  let merkle_path_at_addr _ = failwith "no"
×
760

761
  let get_account_at_addr _ = failwith "no"
×
762
end
14✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc