• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 2995

25 Nov 2024 08:02PM UTC coverage: 32.818% (-27.9%) from 60.697%
2995

push

buildkite

web-flow
Merge pull request #16378 from MinaProtocol/dkijania/implement_test_ledger_apply_dev

implement test ledger apply dev

23246 of 70834 relevant lines covered (32.82%)

17178.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.47
/src/lib/syncable_ledger/syncable_ledger.ml
1
open Core_kernel
20✔
2
open Async_kernel
3
open Pipe_lib
4
open Network_peer
5

6
type Structured_log_events.t += Snarked_ledger_synced
7
  [@@deriving register_event { msg = "Snarked database sync'd. All done" }]
×
8

9
(** Run f recursively n times, starting with value r.
10
    e.g. funpow 3 f r = f (f (f r)) *)
11
let rec funpow n f r = if n > 0 then funpow (n - 1) f (f r) else r
×
12

13
module Query = struct
14
  [%%versioned
15
  module Stable = struct
16
    module V2 = struct
17
      type 'addr t =
20✔
18
        | What_child_hashes of 'addr * int
×
19
            (** What are the hashes of the children of this address? 
20
            If depth > 1 then we get the leaves of a subtree rooted
21
            at address and of the given depth. 
22
            For depth = 1 we have the simplest case with just the 2
23
            direct children.
24
            *)
25
        | What_contents of 'addr
×
26
            (** What accounts are at this address? addr must have depth
27
            tree_depth - account_subtree_height *)
28
        | Num_accounts
×
29
            (** How many accounts are there? Used to size data structure and
30
            figure out what part of the tree is filled in. *)
31
      [@@deriving sexp, yojson, hash, compare]
60✔
32
    end
33

34
    module V1 = struct
35
      type 'addr t =
20✔
36
        | What_child_hashes of 'addr
×
37
            (** What are the hashes of the children of this address? *)
38
        | What_contents of 'addr
×
39
            (** What accounts are at this address? addr must have depth
40
            tree_depth - account_subtree_height *)
41
        | Num_accounts
×
42
            (** How many accounts are there? Used to size data structure and
43
            figure out what part of the tree is filled in. *)
44
      [@@deriving sexp, yojson, hash, compare]
60✔
45

46
      let to_latest : 'a t -> 'a V2.t = function
47
        | What_child_hashes a ->
×
48
            What_child_hashes (a, 1)
49
        | What_contents a ->
×
50
            What_contents a
51
        | Num_accounts ->
×
52
            Num_accounts
53
    end
54
  end]
55
end
56

57
module Answer = struct
58
  [%%versioned
59
  module Stable = struct
60
    module V2 = struct
61
      type ('hash, 'account) t =
20✔
62
        | Child_hashes_are of 'hash Bounded_types.ArrayN4000.Stable.V1.t
×
63
            (** The requested addresses' children have these hashes.
64
            May be any power of 2 number of children, and not necessarily 
65
            immediate children  *)
66
        | Contents_are of 'account list
×
67
            (** The requested address has these accounts *)
68
        | Num_accounts of int * 'hash
×
69
            (** There are this many accounts and the smallest subtree that
70
                contains all non-empty nodes has this hash. *)
71
      [@@deriving sexp, yojson]
60✔
72
    end
73

74
    module V1 = struct
75
      type ('hash, 'account) t =
20✔
76
        | Child_hashes_are of 'hash * 'hash
×
77
            (** The requested address's children have these hashes **)
78
        | Contents_are of 'account list
×
79
            (** The requested address has these accounts *)
80
        | Num_accounts of int * 'hash
×
81
            (** There are this many accounts and the smallest subtree that
82
                contains all non-empty nodes has this hash. *)
83
      [@@deriving sexp, yojson]
60✔
84

85
      let to_latest acct_to_latest = function
86
        | Child_hashes_are (h1, h2) ->
×
87
            V2.Child_hashes_are [| h1; h2 |]
88
        | Contents_are accts ->
×
89
            V2.Contents_are (List.map ~f:acct_to_latest accts)
×
90
        | Num_accounts (i, h) ->
×
91
            V2.Num_accounts (i, h)
92

93
      (* Not a standard versioning function *)
94

95
      (** Attempts to downgrade v2 -> v1 *)
96
      let from_v2 : ('a, 'b) V2.t -> ('a, 'b) t Or_error.t = function
97
        | Child_hashes_are h ->
×
98
            if Array.length h = 2 then Ok (Child_hashes_are (h.(0), h.(1)))
×
99
            else Or_error.error_string "can't downgrade wide query"
×
100
        | Contents_are accs ->
×
101
            Ok (Contents_are accs)
102
        | Num_accounts (n, h) ->
×
103
            Ok (Num_accounts (n, h))
104
    end
105
  end]
106
end
107

108
module type CONTEXT = sig
109
  val logger : Logger.t
110

111
  val compile_config : Mina_compile_config.t
112
end
113

114
module type Inputs_intf = sig
115
  module Addr : module type of Merkle_address
116

117
  module Account : sig
118
    type t [@@deriving bin_io, sexp, yojson]
119
  end
120

121
  module Hash : Merkle_ledger.Intf.Hash with type account := Account.t
122

123
  module Root_hash : sig
124
    type t [@@deriving equal, sexp, yojson]
125

126
    val to_hash : t -> Hash.t
127
  end
128

129
  module MT :
130
    Merkle_ledger.Intf.SYNCABLE
131
      with type hash := Hash.t
132
       and type root_hash := Root_hash.t
133
       and type addr := Addr.t
134
       and type account := Account.t
135

136
  val account_subtree_height : int
137
end
138

139
module type S = sig
140
  type 'a t [@@deriving sexp]
141

142
  type merkle_tree
143

144
  type merkle_path
145

146
  type hash
147

148
  type root_hash
149

150
  type addr
151

152
  type diff
153

154
  type account
155

156
  type index = int
157

158
  type query
159

160
  type answer
161

162
  module Responder : sig
163
    type t
164

165
    val create :
166
         merkle_tree
167
      -> (query -> unit)
168
      -> context:(module CONTEXT)
169
      -> trust_system:Trust_system.t
170
      -> t
171

172
    val answer_query :
173
      t -> query Envelope.Incoming.t -> answer Or_error.t Deferred.t
174
  end
175

176
  val create :
177
       merkle_tree
178
    -> context:(module CONTEXT)
179
    -> trust_system:Trust_system.t
180
    -> 'a t
181

182
  val answer_writer :
183
       'a t
184
    -> (root_hash * query * answer Envelope.Incoming.t) Linear_pipe.Writer.t
185

186
  val query_reader : 'a t -> (root_hash * query) Linear_pipe.Reader.t
187

188
  val destroy : 'a t -> unit
189

190
  val new_goal :
191
       'a t
192
    -> root_hash
193
    -> data:'a
194
    -> equal:('a -> 'a -> bool)
195
    -> [ `Repeat | `New | `Update_data ]
196

197
  val peek_valid_tree : 'a t -> merkle_tree option
198

199
  val valid_tree : 'a t -> (merkle_tree * 'a) Deferred.t
200

201
  val wait_until_valid :
202
       'a t
203
    -> root_hash
204
    -> [ `Ok of merkle_tree | `Target_changed of root_hash option * root_hash ]
205
       Deferred.t
206

207
  val fetch :
208
       'a t
209
    -> root_hash
210
    -> data:'a
211
    -> equal:('a -> 'a -> bool)
212
    -> [ `Ok of merkle_tree | `Target_changed of root_hash option * root_hash ]
213
       Deferred.t
214

215
  val apply_or_queue_diff : 'a t -> diff -> unit
216

217
  val merkle_path_at_addr : 'a t -> addr -> merkle_path Or_error.t
218

219
  val get_account_at_addr : 'a t -> addr -> account Or_error.t
220
end
221

222
(*
223

224
Every node of the merkle tree is always in one of three states:
225

226
- Fresh.
227
  The current contents for this node in the MT match what we
228
  expect.
229
- Stale
230
  The current contents for this node in the MT do _not_ match
231
  what we expect.
232
- Unknown.
233
  We don't know what to expect yet.
234

235

236
Although every node conceptually has one of these states, and can
237
make a transition at any time, the syncer operates only along a
238
"frontier" of the tree, which consists of the deepest Stale nodes.
239

240
The goal of the ledger syncer is to make the root node be fresh,
241
starting from it being stale.
242

243
The syncer usually operates exclusively on these frontier nodes
244
and their direct children. However, the goal hash can change
245
while the syncer is running, and at that point every non-root node
246
conceptually becomes Unknown, and we need to restart. However, we
247
don't need to restart completely: in practice, only small portions
248
of the merkle tree change between goals, and we can re-use the "Stale"
249
nodes we already have if the expected hash doesn't change.
250

251
*)
252
(*
253
Note: while syncing, the underlying ledger is in an
254
indeterminate state. We're mutating hashes at internal
255
nodes without updating their children. In fact, we
256
don't even set all the hashes for the internal nodes!
257
(When we hit a height=N subtree, we don't do anything
258
with the hashes in the bottomost N-1 internal nodes).
259
*)
260

261
module Make (Inputs : Inputs_intf) : sig
262
  open Inputs
263

264
  include
265
    S
266
      with type merkle_tree := MT.t
267
       and type hash := Hash.t
268
       and type root_hash := Root_hash.t
269
       and type addr := Addr.t
270
       and type merkle_path := MT.path
271
       and type account := Account.t
272
       and type query := Addr.t Query.t
273
       and type answer := (Hash.t, Account.t) Answer.t
274
end = struct
275
  open Inputs
276

277
  type diff = unit
278

279
  type index = int
280

281
  type answer = (Hash.t, Account.t) Answer.t
282

283
  type query = Addr.t Query.t
284

285
  (* Provides addresses at an specific depth from this address *)
286
  let intermediate_range ledger_depth addr i =
287
    Array.init (1 lsl i) ~f:(fun idx ->
×
288
        Addr.extend_exn ~ledger_depth addr ~num_bits:i (Int64.of_int idx) )
×
289

290
  module Responder = struct
291
    type t =
292
      { mt : MT.t
293
      ; f : query -> unit
294
      ; context : (module CONTEXT)
295
      ; trust_system : Trust_system.t
296
      }
297

298
    let create :
299
           MT.t
300
        -> (query -> unit)
301
        -> context:(module CONTEXT)
302
        -> trust_system:Trust_system.t
303
        -> t =
304
     fun mt f ~context ~trust_system -> { mt; f; context; trust_system }
×
305

306
    let answer_query :
307
        t -> query Envelope.Incoming.t -> answer Or_error.t Deferred.t =
308
     fun { mt; f; context; trust_system } query_envelope ->
309
      let open (val context) in
×
310
      let open Trust_system in
311
      let ledger_depth = MT.depth mt in
312
      let sender = Envelope.Incoming.sender query_envelope in
×
313
      let query = Envelope.Incoming.data query_envelope in
×
314
      f query ;
×
315
      let response_or_punish =
×
316
        match query with
317
        | What_contents a ->
×
318
            if Addr.height ~ledger_depth a > account_subtree_height then
×
319
              Either.Second
×
320
                ( Actions.Violated_protocol
321
                , Some
322
                    ( "Requested too big of a subtree at once"
323
                    , [ ("addr", Addr.to_yojson a) ] ) )
×
324
            else
325
              let addresses_and_accounts =
×
326
                List.sort ~compare:(fun (addr1, _) (addr2, _) ->
327
                    Addr.compare addr1 addr2 )
×
328
                @@ MT.get_all_accounts_rooted_at_exn mt a
×
329
                (* can't actually throw *)
330
              in
331
              let addresses, accounts = List.unzip addresses_and_accounts in
×
332
              if List.is_empty addresses then
×
333
                (* Peer should know what portions of the tree are full from the
334
                   Num_accounts query. *)
335
                Either.Second
×
336
                  ( Actions.Violated_protocol
337
                  , Some
338
                      ("Requested empty subtree", [ ("addr", Addr.to_yojson a) ])
×
339
                  )
340
              else
341
                let first_address, rest_address =
×
342
                  (List.hd_exn addresses, List.tl_exn addresses)
×
343
                in
344
                let missing_address, is_compact =
345
                  List.fold rest_address
346
                    ~init:(Addr.next first_address, true)
×
347
                    ~f:(fun (expected_address, is_compact) actual_address ->
348
                      if
×
349
                        is_compact
350
                        && [%equal: Addr.t option] expected_address
×
351
                             (Some actual_address)
352
                      then (Addr.next actual_address, true)
×
353
                      else (expected_address, false) )
×
354
                in
355
                if not is_compact then (
×
356
                  (* indicates our ledger is invalid somehow. *)
357
                  [%log fatal]
×
358
                    ~metadata:
359
                      [ ( "missing_address"
360
                        , Addr.to_yojson (Option.value_exn missing_address) )
×
361
                      ; ( "addresses_and_accounts"
362
                        , `List
363
                            (List.map addresses_and_accounts
×
364
                               ~f:(fun (addr, account) ->
365
                                 `Tuple
×
366
                                   [ Addr.to_yojson addr
×
367
                                   ; Account.to_yojson account
×
368
                                   ] ) ) )
369
                      ]
370
                    "Missing an account at address: $missing_address inside \
371
                     the list: $addresses_and_accounts" ;
372
                  assert false )
×
373
                else Either.First (Answer.Contents_are accounts)
×
374
        | Num_accounts ->
×
375
            let len = MT.num_accounts mt in
376
            let height = Int.ceil_log2 len in
×
377
            (* FIXME: bug when height=0 https://github.com/o1-labs/nanobit/issues/365 *)
378
            let content_root_addr =
×
379
              funpow
380
                (MT.depth mt - height)
×
381
                (fun a -> Addr.child_exn ~ledger_depth a Direction.Left)
×
382
                (Addr.root ())
×
383
            in
384
            Either.First
×
385
              (Num_accounts
386
                 (len, MT.get_inner_hash_at_addr_exn mt content_root_addr) )
×
387
        | What_child_hashes (a, subtree_depth) -> (
×
388
            match subtree_depth with
389
            | n when n >= 1 -> (
×
390
                let subtree_depth =
391
                  min n compile_config.sync_ledger_max_subtree_depth
392
                in
393
                let ledger_depth = MT.depth mt in
×
394
                let addresses =
×
395
                  intermediate_range ledger_depth a subtree_depth
396
                in
397
                match
×
398
                  Or_error.try_with (fun () ->
399
                      let get_hash a = MT.get_inner_hash_at_addr_exn mt a in
×
400
                      let hashes = Array.map addresses ~f:get_hash in
401
                      Answer.Child_hashes_are hashes )
×
402
                with
403
                | Ok answer ->
×
404
                    Either.First answer
405
                | Error e ->
×
406
                    [%log error]
×
407
                      ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
408
                      "When handling What_child_hashes request, the following \
409
                       error happended: $error" ;
410
                    Either.Second
×
411
                      ( Actions.Violated_protocol
412
                      , Some
413
                          ( "Invalid address in What_child_hashes request"
414
                          , [ ("addr", Addr.to_yojson a) ] ) ) )
×
415
            | _ ->
×
416
                [%log error]
×
417
                  "When handling What_child_hashes request, the depth was \
418
                   outside the valid range" ;
419
                Either.Second
×
420
                  ( Actions.Violated_protocol
421
                  , Some
422
                      ( "Invalid depth requested in What_child_hashes request"
423
                      , [ ("addr", Addr.to_yojson a) ] ) ) )
×
424
      in
425

426
      match response_or_punish with
427
      | Either.First answer ->
×
428
          Deferred.return @@ Ok answer
429
      | Either.Second action ->
×
430
          let%map _ =
431
            record_envelope_sender trust_system logger sender action
×
432
          in
433
          let err =
×
434
            Option.value_map ~default:"Violated protocol" (snd action) ~f:fst
×
435
          in
436
          Or_error.error_string err
×
437
  end
438

439
  type 'a t =
440
    { mutable desired_root : Root_hash.t option
441
    ; mutable auxiliary_data : 'a option
442
    ; tree : MT.t
443
    ; trust_system : Trust_system.t
444
    ; answers :
445
        (Root_hash.t * query * answer Envelope.Incoming.t) Linear_pipe.Reader.t
446
    ; answer_writer :
447
        (Root_hash.t * query * answer Envelope.Incoming.t) Linear_pipe.Writer.t
448
    ; queries : (Root_hash.t * query) Linear_pipe.Writer.t
449
    ; query_reader : (Root_hash.t * query) Linear_pipe.Reader.t
450
    ; waiting_parents : Hash.t Addr.Table.t
451
          (** Addresses we are waiting for the children of, and the expected
452
              hash of the node with the address. *)
453
    ; waiting_content : Hash.t Addr.Table.t
454
    ; mutable validity_listener :
455
        [ `Ok | `Target_changed of Root_hash.t option * Root_hash.t ] Ivar.t
456
    ; context : (module CONTEXT)
457
    }
458

459
  let t_of_sexp _ = failwith "t_of_sexp: not implemented"
×
460

461
  let sexp_of_t _ = failwith "sexp_of_t: not implemented"
×
462

463
  let desired_root_exn { desired_root; _ } = desired_root |> Option.value_exn
×
464

465
  let destroy t =
466
    Linear_pipe.close_read t.answers ;
×
467
    Linear_pipe.close_read t.query_reader
×
468

469
  let answer_writer t = t.answer_writer
×
470

471
  let query_reader t = t.query_reader
×
472

473
  let expect_children : 'a t -> Addr.t -> Hash.t -> unit =
474
   fun t parent_addr expected ->
475
    let open (val t.context) in
×
476
    [%log trace]
×
477
      ~metadata:
478
        [ ("parent_address", Addr.to_yojson parent_addr)
×
479
        ; ("hash", Hash.to_yojson expected)
×
480
        ]
481
      "Expecting children parent $parent_address, expected: $hash" ;
482
    Addr.Table.add_exn t.waiting_parents ~key:parent_addr ~data:expected
×
483

484
  let expect_content : 'a t -> Addr.t -> Hash.t -> unit =
485
   fun t addr expected ->
486
    let open (val t.context) in
×
487
    [%log trace]
×
488
      ~metadata:
489
        [ ("address", Addr.to_yojson addr); ("hash", Hash.to_yojson expected) ]
×
490
      "Expecting content addr $address, expected: $hash" ;
491
    Addr.Table.add_exn t.waiting_content ~key:addr ~data:expected
×
492

493
  (** Given an address and the accounts below that address, fill in the tree
494
      with them. *)
495
  let add_content :
496
         'a t
497
      -> Addr.t
498
      -> Account.t list
499
      -> [ `Success
500
         | `Hash_mismatch of Hash.t * Hash.t  (** expected hash, actual *) ] =
501
   fun t addr content ->
502
    let open (val t.context) in
×
503
    let expected = Addr.Table.find_exn t.waiting_content addr in
504
    (* TODO #444 should we batch all the updates and do them at the end? *)
505
    (* We might write the wrong data to the underlying ledger here, but if so
506
       we'll requeue the address and it'll be overwritten. *)
507
    MT.set_all_accounts_rooted_at_exn t.tree addr content ;
×
508
    Addr.Table.remove t.waiting_content addr ;
×
509
    [%log trace]
×
510
      ~metadata:
511
        [ ("address", Addr.to_yojson addr); ("hash", Hash.to_yojson expected) ]
×
512
      "Found content addr $address, with hash $hash, removing from waiting \
513
       content" ;
514
    let actual = MT.get_inner_hash_at_addr_exn t.tree addr in
×
515
    if Hash.equal actual expected then `Success
×
516
    else `Hash_mismatch (expected, actual)
×
517

518
  (* Merges each 2 contigous nodes, halving the size of the array *)
519
  let merge_siblings : Hash.t array -> index -> Hash.t array =
520
   fun nodes height ->
521
    let len = Array.length nodes in
×
522
    if len mod 2 <> 0 then failwith "length must be even" ;
×
523
    let half_len = len / 2 in
×
524
    let f i = Hash.merge ~height nodes.(2 * i) nodes.((2 * i) + 1) in
×
525
    Array.init half_len ~f
526

527
  (* Assumes nodes to be a power of 2 and merges them into their common root *)
528
  let rec merge_many : Hash.t array -> index -> Hash.t =
529
   fun nodes height ->
530
    let len = Array.length nodes in
×
531
    match len with
×
532
    | 1 ->
×
533
        nodes.(0)
534
    | _ ->
×
535
        let half = merge_siblings nodes height in
536
        merge_many half (height + 1)
×
537

538
  let merge_many : Hash.t array -> index -> index -> Hash.t =
539
   fun nodes height subtree_depth ->
540
    let bottom_height = height - subtree_depth in
×
541
    let hash = merge_many nodes bottom_height in
542
    hash
×
543

544
  (* Adds the subtree given as the 2^k subtree leaves with the given prefix address *)
545
  (* Returns next nodes to be checked *)
546
  let add_subtree :
547
         'a t
548
      -> Addr.t
549
      -> Hash.t array
550
      -> int
551
      -> [ `Good of (Addr.t * Hash.t) array
552
         | `Hash_mismatch of Hash.t * Hash.t
553
         | `Invalid_length ] =
554
   fun t addr nodes requested_depth ->
555
    let open (val t.context) in
×
556
    let len = Array.length nodes in
557
    let is_power = Int.is_pow2 len in
×
558
    let is_more_than_two = len >= 2 in
×
559
    let subtree_depth = Int.ceil_log2 len in
560
    let less_than_requested = subtree_depth <= requested_depth in
×
561
    let valid_length = is_power && is_more_than_two && less_than_requested in
×
562
    if valid_length then
563
      let ledger_depth = MT.depth t.tree in
×
564
      let expected =
×
565
        Option.value_exn ~message:"Forgot to wait for a node"
566
          (Addr.Table.find t.waiting_parents addr)
×
567
      in
568
      let merged =
×
569
        merge_many nodes (ledger_depth - Addr.depth addr) subtree_depth
×
570
      in
571
      if Hash.equal expected merged then (
×
572
        Addr.Table.remove t.waiting_parents addr ;
573
        let addresses = intermediate_range ledger_depth addr subtree_depth in
×
574
        let addresses_and_hashes = Array.zip_exn addresses nodes in
×
575

576
        (* Filter to fetch only those that differ *)
577
        let should_fetch_children addr hash =
×
578
          not @@ Hash.equal (MT.get_inner_hash_at_addr_exn t.tree addr) hash
×
579
        in
580
        let subtrees_to_fetch =
581
          addresses_and_hashes
582
          |> Array.filter ~f:(Tuple2.uncurry should_fetch_children)
×
583
        in
584
        `Good subtrees_to_fetch )
×
585
      else `Hash_mismatch (expected, merged)
×
586
    else `Invalid_length
×
587

588
  let all_done t =
589
    let open (val t.context) in
×
590
    if not (Root_hash.equal (MT.merkle_root t.tree) (desired_root_exn t)) then
×
591
      failwith "We finished syncing, but made a mistake somewhere :("
×
592
    else (
×
593
      if Ivar.is_full t.validity_listener then
594
        [%log error] "Ivar.fill bug is here!" ;
×
595
      Ivar.fill t.validity_listener `Ok )
×
596

597
  (** Compute the hash of an empty tree of the specified height. *)
598
  let empty_hash_at_height h =
599
    let rec go prev ctr =
×
600
      if ctr = h then prev else go (Hash.merge ~height:ctr prev prev) (ctr + 1)
×
601
    in
602
    go Hash.empty_account 0
603

604
  (** Given the hash of the smallest subtree that contains all accounts, the
605
      height of that hash in the tree and the height of the whole tree, compute
606
      the hash of the whole tree. *)
607
  let complete_with_empties hash start_height result_height =
608
    let rec go cur_empty prev_hash height =
×
609
      if height = result_height then prev_hash
×
610
      else
611
        let cur = Hash.merge ~height prev_hash cur_empty in
×
612
        let next_empty = Hash.merge ~height cur_empty cur_empty in
×
613
        go next_empty cur (height + 1)
×
614
    in
615
    go (empty_hash_at_height start_height) hash start_height
×
616

617
  (** Given an address and the hash of the corresponding subtree, start getting
618
      the children.
619
  *)
620
  let handle_node t addr exp_hash =
621
    let open (val t.context) in
×
622
    if Addr.depth addr >= MT.depth t.tree - account_subtree_height then (
×
623
      expect_content t addr exp_hash ;
624
      Linear_pipe.write_without_pushback_if_open t.queries
×
625
        (desired_root_exn t, What_contents addr) )
×
626
    else (
×
627
      expect_children t addr exp_hash ;
628
      Linear_pipe.write_without_pushback_if_open t.queries
×
629
        ( desired_root_exn t
×
630
        , What_child_hashes
631
            (addr, compile_config.sync_ledger_default_subtree_depth) ) )
632

633
  (** Handle the initial Num_accounts message, starting the main syncing
634
      process. *)
635
  let handle_num_accounts :
636
      'a t -> int -> Hash.t -> [ `Success | `Hash_mismatch of Hash.t * Hash.t ]
637
      =
638
   fun t n content_hash ->
639
    let rh = Root_hash.to_hash (desired_root_exn t) in
×
640
    let height = Int.ceil_log2 n in
×
641
    (* FIXME: bug when height=0 https://github.com/o1-labs/nanobit/issues/365 *)
642
    let actual = complete_with_empties content_hash height (MT.depth t.tree) in
×
643
    if Hash.equal actual rh then (
×
644
      Addr.Table.clear t.waiting_parents ;
645
      (* We should use this information to set the empty account slots empty and
646
         start syncing at the content root. See #1972. *)
647
      Addr.Table.clear t.waiting_content ;
×
648
      handle_node t (Addr.root ()) rh ;
×
649
      `Success )
×
650
    else `Hash_mismatch (rh, actual)
×
651

652
  let main_loop t =
653
    let open (val t.context) in
×
654
    let handle_answer :
655
           Root_hash.t
656
           * Addr.t Query.t
657
           * (Hash.t, Account.t) Answer.t Envelope.Incoming.t
658
        -> unit Deferred.t =
659
     fun (root_hash, query, env) ->
660
      (* NOTE: think about synchronization here. This is deferred now, so
661
         the t and the underlying ledger can change while processing is
662
         happening. *)
663
      let already_done =
×
664
        match Ivar.peek t.validity_listener with Some `Ok -> true | _ -> false
×
665
      in
666
      let sender = Envelope.Incoming.sender env in
667
      let answer = Envelope.Incoming.data env in
×
668
      [%log trace]
×
669
        ~metadata:
670
          [ ("root_hash", Root_hash.to_yojson root_hash)
×
671
          ; ("query", Query.to_yojson Addr.to_yojson query)
×
672
          ]
673
        "Handle answer for $root_hash" ;
674
      if not (Root_hash.equal root_hash (desired_root_exn t)) then (
×
675
        [%log trace]
×
676
          ~metadata:
677
            [ ("desired_hash", Root_hash.to_yojson (desired_root_exn t))
×
678
            ; ("ignored_hash", Root_hash.to_yojson root_hash)
×
679
            ]
680
          "My desired root was $desired_hash, so I'm ignoring $ignored_hash" ;
681
        Deferred.unit )
×
682
      else if already_done then (
×
683
        (* This can happen if we asked for hashes that turn out to be equal in
684
           underlying ledger and the target. *)
685
        [%log debug] "Got sync response when we're already finished syncing" ;
×
686
        Deferred.unit )
×
687
      else
688
        let open Trust_system in
×
689
        (* If a peer misbehaves we still need the information we asked them for,
690
           so requeue in that case. *)
691
        let requeue_query () =
692
          Linear_pipe.write_without_pushback_if_open t.queries (root_hash, query)
×
693
        in
694
        let credit_fulfilled_request () =
695
          record_envelope_sender t.trust_system logger sender
×
696
            ( Actions.Fulfilled_request
697
            , Some
698
                ( "sync ledger query $query"
699
                , [ ("query", Query.to_yojson Addr.to_yojson query) ] ) )
×
700
        in
701
        let%bind _ =
702
          match (query, answer) with
703
          | Query.What_contents addr, Answer.Contents_are leaves -> (
×
704
              match add_content t addr leaves with
705
              | `Success ->
×
706
                  credit_fulfilled_request ()
×
707
              | `Hash_mismatch (expected, actual) ->
×
708
                  let%map () =
709
                    record_envelope_sender t.trust_system logger sender
×
710
                      ( Actions.Sent_bad_hash
711
                      , Some
712
                          ( "sent accounts $accounts for address $addr, they \
713
                             hash to $actual but we expected $expected"
714
                          , [ ( "accounts"
715
                              , `List (List.map ~f:Account.to_yojson leaves) )
×
716
                            ; ("addr", Addr.to_yojson addr)
×
717
                            ; ("actual", Hash.to_yojson actual)
×
718
                            ; ("expected", Hash.to_yojson expected)
×
719
                            ] ) )
720
                  in
721
                  requeue_query () )
×
722
          | Query.Num_accounts, Answer.Num_accounts (count, content_root) -> (
×
723
              match handle_num_accounts t count content_root with
724
              | `Success ->
×
725
                  credit_fulfilled_request ()
×
726
              | `Hash_mismatch (expected, actual) ->
×
727
                  let%map () =
728
                    record_envelope_sender t.trust_system logger sender
×
729
                      ( Actions.Sent_bad_hash
730
                      , Some
731
                          ( "Claimed num_accounts $count, content root hash \
732
                             $content_root_hash, that implies a root hash of \
733
                             $actual, we expected $expected"
734
                          , [ ("count", `Int count)
735
                            ; ("content_root_hash", Hash.to_yojson content_root)
×
736
                            ; ("actual", Hash.to_yojson actual)
×
737
                            ; ("expected", Hash.to_yojson expected)
×
738
                            ] ) )
739
                  in
740
                  requeue_query () )
×
741
          | ( Query.What_child_hashes (address, requested_depth)
×
742
            , Answer.Child_hashes_are hashes ) -> (
743
              match add_subtree t address hashes requested_depth with
744
              | `Hash_mismatch (expected, actual) ->
×
745
                  let%map () =
746
                    record_envelope_sender t.trust_system logger sender
×
747
                      ( Actions.Sent_bad_hash
748
                      , Some
749
                          ( "hashes sent for subtree on address $address merge \
750
                             to $actual_merge but we expected $expected_merge"
751
                          , [ ("actual_merge", Hash.to_yojson actual)
×
752
                            ; ("expected_merge", Hash.to_yojson expected)
×
753
                            ] ) )
754
                  in
755
                  requeue_query ()
×
756
              | `Invalid_length ->
×
757
                  let%map () =
758
                    record_envelope_sender t.trust_system logger sender
×
759
                      ( Actions.Sent_bad_hash
760
                      , Some
761
                          ( "hashes sent for subtree on address $address must \
762
                             be a power of 2 in the range 2-2^$depth"
763
                          , [ ( "depth"
764
                              , `Int
765
                                  compile_config.sync_ledger_max_subtree_depth
766
                              )
767
                            ] ) )
768
                  in
769
                  requeue_query ()
×
770
              | `Good children_to_verify ->
×
771
                  Array.iter children_to_verify ~f:(fun (addr, hash) ->
772
                      handle_node t addr hash ) ;
×
773
                  credit_fulfilled_request () )
×
774
          | query, answer ->
×
775
              let%map () =
776
                record_envelope_sender t.trust_system logger sender
×
777
                  ( Actions.Violated_protocol
778
                  , Some
779
                      ( "Answered question we didn't ask! Query was $query \
780
                         answer was $answer"
781
                      , [ ("query", Query.to_yojson Addr.to_yojson query)
×
782
                        ; ( "answer"
783
                          , Answer.to_yojson Hash.to_yojson Account.to_yojson
×
784
                              answer )
785
                        ] ) )
786
              in
787
              requeue_query ()
×
788
        in
789
        if
×
790
          Root_hash.equal
791
            (Option.value_exn t.desired_root)
×
792
            (MT.merkle_root t.tree)
×
793
        then (
×
794
          [%str_log trace] Snarked_ledger_synced ;
×
795
          all_done t ) ;
×
796
        Deferred.unit
×
797
    in
798
    Linear_pipe.iter t.answers ~f:handle_answer
799

800
  let new_goal t h ~data ~equal =
801
    let open (val t.context) in
×
802
    let should_skip =
803
      match t.desired_root with
804
      | None ->
×
805
          false
806
      | Some h' ->
×
807
          Root_hash.equal h h'
×
808
    in
809
    if not should_skip then (
×
810
      Option.iter t.desired_root ~f:(fun root_hash ->
811
          [%log debug]
×
812
            ~metadata:
813
              [ ("old_root_hash", Root_hash.to_yojson root_hash)
×
814
              ; ("new_root_hash", Root_hash.to_yojson h)
×
815
              ]
816
            "New_goal: changing target from $old_root_hash to $new_root_hash" ) ;
817
      Ivar.fill_if_empty t.validity_listener
×
818
        (`Target_changed (t.desired_root, h)) ;
819
      t.validity_listener <- Ivar.create () ;
×
820
      t.desired_root <- Some h ;
821
      t.auxiliary_data <- Some data ;
822
      Linear_pipe.write_without_pushback_if_open t.queries (h, Num_accounts) ;
823
      `New )
×
824
    else if
×
825
      Option.fold t.auxiliary_data ~init:false ~f:(fun _ saved_data ->
826
          equal data saved_data )
×
827
    then (
×
828
      [%log debug] "New_goal to same hash, not doing anything" ;
×
829
      `Repeat )
×
830
    else (
×
831
      t.auxiliary_data <- Some data ;
832
      `Update_data )
833

834
  let rec valid_tree t =
835
    match%bind Ivar.read t.validity_listener with
×
836
    | `Ok ->
×
837
        return (t.tree, Option.value_exn t.auxiliary_data)
×
838
    | `Target_changed _ ->
×
839
        valid_tree t
840

841
  let peek_valid_tree t =
842
    Option.bind (Ivar.peek t.validity_listener) ~f:(function
×
843
      | `Ok ->
×
844
          Some t.tree
845
      | `Target_changed _ ->
×
846
          None )
847

848
  let wait_until_valid t h =
849
    if not (Root_hash.equal h (desired_root_exn t)) then
×
850
      return (`Target_changed (t.desired_root, h))
×
851
    else
852
      Deferred.map (Ivar.read t.validity_listener) ~f:(function
×
853
        | `Target_changed payload ->
×
854
            `Target_changed payload
855
        | `Ok ->
×
856
            `Ok t.tree )
857

858
  let fetch t rh ~data ~equal =
859
    ignore (new_goal t rh ~data ~equal : [ `New | `Repeat | `Update_data ]) ;
×
860
    wait_until_valid t rh
861

862
  let create mt ~context ~trust_system =
863
    let qr, qw = Linear_pipe.create () in
×
864
    let ar, aw = Linear_pipe.create () in
×
865
    let t =
×
866
      { desired_root = None
867
      ; auxiliary_data = None
868
      ; tree = mt
869
      ; trust_system
870
      ; answers = ar
871
      ; answer_writer = aw
872
      ; queries = qw
873
      ; query_reader = qr
874
      ; waiting_parents = Addr.Table.create ()
×
875
      ; waiting_content = Addr.Table.create ()
×
876
      ; validity_listener = Ivar.create ()
×
877
      ; context
878
      }
879
    in
880
    don't_wait_for (main_loop t) ;
×
881
    t
×
882

883
  let apply_or_queue_diff _ _ =
884
    (* Need some interface for the diffs, not sure the layering is right here. *)
885
    failwith "todo"
×
886

887
  let merkle_path_at_addr _ = failwith "no"
×
888

889
  let get_account_at_addr _ = failwith "no"
×
890
end
40✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc