• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 3409

26 Feb 2025 01:10PM UTC coverage: 32.353% (-28.4%) from 60.756%
3409

push

buildkite

web-flow
Merge pull request #16687 from MinaProtocol/dw/merge-compatible-into-develop-20250225

Merge compatible into develop [20250224]

23144 of 71535 relevant lines covered (32.35%)

16324.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.44
/src/lib/syncable_ledger/syncable_ledger.ml
1
open Core_kernel
20✔
2
open Async_kernel
3
open Pipe_lib
4
open Network_peer
5

6
type Structured_log_events.t += Snarked_ledger_synced
7
  [@@deriving register_event { msg = "Snarked database sync'd. All done" }]
×
8

9
(** Run f recursively n times, starting with value r.
10
    e.g. funpow 3 f r = f (f (f r)) *)
11
let rec funpow n f r = if n > 0 then funpow (n - 1) f (f r) else r
×
12

13
module Query = struct
14
  [%%versioned
15
  module Stable = struct
16
    module V2 = struct
17
      type 'addr t =
20✔
18
        | What_child_hashes of 'addr * int
×
19
            (** What are the hashes of the children of this address? 
20
            If depth > 1 then we get the leaves of a subtree rooted
21
            at address and of the given depth. 
22
            For depth = 1 we have the simplest case with just the 2
23
            direct children.
24
            *)
25
        | What_contents of 'addr
×
26
            (** What accounts are at this address? addr must have depth
27
            tree_depth - account_subtree_height *)
28
        | Num_accounts
×
29
            (** How many accounts are there? Used to size data structure and
30
            figure out what part of the tree is filled in. *)
31
      [@@deriving sexp, yojson, hash, compare]
60✔
32
    end
33

34
    module V1 = struct
35
      type 'addr t =
20✔
36
        | What_child_hashes of 'addr
×
37
            (** What are the hashes of the children of this address? *)
38
        | What_contents of 'addr
×
39
            (** What accounts are at this address? addr must have depth
40
            tree_depth - account_subtree_height *)
41
        | Num_accounts
×
42
            (** How many accounts are there? Used to size data structure and
43
            figure out what part of the tree is filled in. *)
44
      [@@deriving sexp, yojson, hash, compare]
60✔
45

46
      let to_latest : 'a t -> 'a V2.t = function
47
        | What_child_hashes a ->
×
48
            What_child_hashes (a, 1)
49
        | What_contents a ->
×
50
            What_contents a
51
        | Num_accounts ->
×
52
            Num_accounts
53
    end
54
  end]
55
end
56

57
module Answer = struct
58
  [%%versioned
59
  module Stable = struct
60
    module V2 = struct
61
      type ('hash, 'account) t =
20✔
62
        | Child_hashes_are of 'hash Bounded_types.ArrayN4000.Stable.V1.t
×
63
            (** The requested addresses' children have these hashes.
64
            May be any power of 2 number of children, and not necessarily 
65
            immediate children  *)
66
        | Contents_are of 'account list
×
67
            (** The requested address has these accounts *)
68
        | Num_accounts of int * 'hash
×
69
            (** There are this many accounts and the smallest subtree that
70
                contains all non-empty nodes has this hash. *)
71
      [@@deriving sexp, yojson]
60✔
72
    end
73

74
    module V1 = struct
75
      type ('hash, 'account) t =
20✔
76
        | Child_hashes_are of 'hash * 'hash
×
77
            (** The requested address's children have these hashes **)
78
        | Contents_are of 'account list
×
79
            (** The requested address has these accounts *)
80
        | Num_accounts of int * 'hash
×
81
            (** There are this many accounts and the smallest subtree that
82
                contains all non-empty nodes has this hash. *)
83
      [@@deriving sexp, yojson]
60✔
84

85
      let to_latest acct_to_latest = function
86
        | Child_hashes_are (h1, h2) ->
×
87
            V2.Child_hashes_are [| h1; h2 |]
88
        | Contents_are accts ->
×
89
            V2.Contents_are (List.map ~f:acct_to_latest accts)
×
90
        | Num_accounts (i, h) ->
×
91
            V2.Num_accounts (i, h)
92

93
      (* Not a standard versioning function *)
94

95
      (** Attempts to downgrade v2 -> v1 *)
96
      let from_v2 : ('a, 'b) V2.t -> ('a, 'b) t Or_error.t = function
97
        | Child_hashes_are h ->
×
98
            if Array.length h = 2 then Ok (Child_hashes_are (h.(0), h.(1)))
×
99
            else Or_error.error_string "can't downgrade wide query"
×
100
        | Contents_are accs ->
×
101
            Ok (Contents_are accs)
102
        | Num_accounts (n, h) ->
×
103
            Ok (Num_accounts (n, h))
104
    end
105
  end]
106
end
107

108
type daemon_config = { max_subtree_depth : int; default_subtree_depth : int }
109

110
let create_config ~(compile_config : Mina_compile_config.t) ~max_subtree_depth
111
    ~default_subtree_depth () =
112
  { max_subtree_depth =
×
113
      Option.value ~default:compile_config.sync_ledger_max_subtree_depth
×
114
        max_subtree_depth
115
  ; default_subtree_depth =
116
      Option.value ~default:compile_config.sync_ledger_default_subtree_depth
×
117
        default_subtree_depth
118
  }
119

120
module type CONTEXT = sig
121
  val logger : Logger.t
122

123
  val ledger_sync_config : daemon_config
124
end
125

126
module type Inputs_intf = sig
127
  module Addr : module type of Merkle_address
128

129
  module Account : sig
130
    type t [@@deriving bin_io, sexp, yojson]
131
  end
132

133
  module Hash : Merkle_ledger.Intf.Hash with type account := Account.t
134

135
  module Root_hash : sig
136
    type t [@@deriving equal, sexp, yojson]
137

138
    val to_hash : t -> Hash.t
139
  end
140

141
  module MT :
142
    Merkle_ledger.Intf.SYNCABLE
143
      with type hash := Hash.t
144
       and type root_hash := Root_hash.t
145
       and type addr := Addr.t
146
       and type account := Account.t
147

148
  val account_subtree_height : int
149
end
150

151
module type S = sig
152
  type 'a t [@@deriving sexp]
153

154
  type merkle_tree
155

156
  type merkle_path
157

158
  type hash
159

160
  type root_hash
161

162
  type addr
163

164
  type diff
165

166
  type account
167

168
  type index = int
169

170
  type query
171

172
  type answer
173

174
  module Responder : sig
175
    type t
176

177
    val create :
178
         merkle_tree
179
      -> (query -> unit)
180
      -> context:(module CONTEXT)
181
      -> trust_system:Trust_system.t
182
      -> t
183

184
    val answer_query :
185
      t -> query Envelope.Incoming.t -> answer Or_error.t Deferred.t
186
  end
187

188
  val create :
189
       merkle_tree
190
    -> context:(module CONTEXT)
191
    -> trust_system:Trust_system.t
192
    -> 'a t
193

194
  val answer_writer :
195
       'a t
196
    -> (root_hash * query * answer Envelope.Incoming.t) Linear_pipe.Writer.t
197

198
  val query_reader : 'a t -> (root_hash * query) Linear_pipe.Reader.t
199

200
  val destroy : 'a t -> unit
201

202
  val new_goal :
203
       'a t
204
    -> root_hash
205
    -> data:'a
206
    -> equal:('a -> 'a -> bool)
207
    -> [ `Repeat | `New | `Update_data ]
208

209
  val peek_valid_tree : 'a t -> merkle_tree option
210

211
  val valid_tree : 'a t -> (merkle_tree * 'a) Deferred.t
212

213
  val wait_until_valid :
214
       'a t
215
    -> root_hash
216
    -> [ `Ok of merkle_tree | `Target_changed of root_hash option * root_hash ]
217
       Deferred.t
218

219
  val fetch :
220
       'a t
221
    -> root_hash
222
    -> data:'a
223
    -> equal:('a -> 'a -> bool)
224
    -> [ `Ok of merkle_tree | `Target_changed of root_hash option * root_hash ]
225
       Deferred.t
226

227
  val apply_or_queue_diff : 'a t -> diff -> unit
228

229
  val merkle_path_at_addr : 'a t -> addr -> merkle_path Or_error.t
230

231
  val get_account_at_addr : 'a t -> addr -> account Or_error.t
232
end
233

234
(*
235

236
Every node of the merkle tree is always in one of three states:
237

238
- Fresh.
239
  The current contents for this node in the MT match what we
240
  expect.
241
- Stale
242
  The current contents for this node in the MT do _not_ match
243
  what we expect.
244
- Unknown.
245
  We don't know what to expect yet.
246

247

248
Although every node conceptually has one of these states, and can
249
make a transition at any time, the syncer operates only along a
250
"frontier" of the tree, which consists of the deepest Stale nodes.
251

252
The goal of the ledger syncer is to make the root node be fresh,
253
starting from it being stale.
254

255
The syncer usually operates exclusively on these frontier nodes
256
and their direct children. However, the goal hash can change
257
while the syncer is running, and at that point every non-root node
258
conceptually becomes Unknown, and we need to restart. However, we
259
don't need to restart completely: in practice, only small portions
260
of the merkle tree change between goals, and we can re-use the "Stale"
261
nodes we already have if the expected hash doesn't change.
262

263
*)
264
(*
265
Note: while syncing, the underlying ledger is in an
266
indeterminate state. We're mutating hashes at internal
267
nodes without updating their children. In fact, we
268
don't even set all the hashes for the internal nodes!
269
(When we hit a height=N subtree, we don't do anything
270
with the hashes in the bottomost N-1 internal nodes).
271
*)
272

273
module Make (Inputs : Inputs_intf) : sig
274
  open Inputs
275

276
  include
277
    S
278
      with type merkle_tree := MT.t
279
       and type hash := Hash.t
280
       and type root_hash := Root_hash.t
281
       and type addr := Addr.t
282
       and type merkle_path := MT.path
283
       and type account := Account.t
284
       and type query := Addr.t Query.t
285
       and type answer := (Hash.t, Account.t) Answer.t
286
end = struct
287
  open Inputs
288

289
  type diff = unit
290

291
  type index = int
292

293
  type answer = (Hash.t, Account.t) Answer.t
294

295
  type query = Addr.t Query.t
296

297
  (* Provides addresses at an specific depth from this address *)
298
  let intermediate_range ledger_depth addr i =
299
    Array.init (1 lsl i) ~f:(fun idx ->
×
300
        Addr.extend_exn ~ledger_depth addr ~num_bits:i (Int64.of_int idx) )
×
301

302
  module Responder = struct
303
    type t =
304
      { mt : MT.t
305
      ; f : query -> unit
306
      ; context : (module CONTEXT)
307
      ; trust_system : Trust_system.t
308
      }
309

310
    let create :
311
           MT.t
312
        -> (query -> unit)
313
        -> context:(module CONTEXT)
314
        -> trust_system:Trust_system.t
315
        -> t =
316
     fun mt f ~context ~trust_system -> { mt; f; context; trust_system }
×
317

318
    let answer_query :
319
        t -> query Envelope.Incoming.t -> answer Or_error.t Deferred.t =
320
     fun { mt; f; context; trust_system } query_envelope ->
321
      let open (val context) in
×
322
      let open Trust_system in
323
      let ledger_depth = MT.depth mt in
324
      let sender = Envelope.Incoming.sender query_envelope in
×
325
      let query = Envelope.Incoming.data query_envelope in
×
326
      f query ;
×
327
      let response_or_punish =
×
328
        match query with
329
        | What_contents a ->
×
330
            if Addr.height ~ledger_depth a > account_subtree_height then
×
331
              Either.Second
×
332
                ( Actions.Violated_protocol
333
                , Some
334
                    ( "Requested too big of a subtree at once"
335
                    , [ ("addr", Addr.to_yojson a) ] ) )
×
336
            else
337
              let addresses_and_accounts =
×
338
                List.sort ~compare:(fun (addr1, _) (addr2, _) ->
339
                    Addr.compare addr1 addr2 )
×
340
                @@ MT.get_all_accounts_rooted_at_exn mt a
×
341
                (* can't actually throw *)
342
              in
343
              let addresses, accounts = List.unzip addresses_and_accounts in
×
344
              if List.is_empty addresses then
×
345
                (* Peer should know what portions of the tree are full from the
346
                   Num_accounts query. *)
347
                Either.Second
×
348
                  ( Actions.Violated_protocol
349
                  , Some
350
                      ("Requested empty subtree", [ ("addr", Addr.to_yojson a) ])
×
351
                  )
352
              else
353
                let first_address, rest_address =
×
354
                  (List.hd_exn addresses, List.tl_exn addresses)
×
355
                in
356
                let missing_address, is_compact =
357
                  List.fold rest_address
358
                    ~init:(Addr.next first_address, true)
×
359
                    ~f:(fun (expected_address, is_compact) actual_address ->
360
                      if
×
361
                        is_compact
362
                        && [%equal: Addr.t option] expected_address
×
363
                             (Some actual_address)
364
                      then (Addr.next actual_address, true)
×
365
                      else (expected_address, false) )
×
366
                in
367
                if not is_compact then (
×
368
                  (* indicates our ledger is invalid somehow. *)
369
                  [%log fatal]
×
370
                    ~metadata:
371
                      [ ( "missing_address"
372
                        , Addr.to_yojson (Option.value_exn missing_address) )
×
373
                      ; ( "addresses_and_accounts"
374
                        , `List
375
                            (List.map addresses_and_accounts
×
376
                               ~f:(fun (addr, account) ->
377
                                 `Tuple
×
378
                                   [ Addr.to_yojson addr
×
379
                                   ; Account.to_yojson account
×
380
                                   ] ) ) )
381
                      ]
382
                    "Missing an account at address: $missing_address inside \
383
                     the list: $addresses_and_accounts" ;
384
                  assert false )
×
385
                else Either.First (Answer.Contents_are accounts)
×
386
        | Num_accounts ->
×
387
            let len = MT.num_accounts mt in
388
            let height = Int.ceil_log2 len in
×
389
            (* FIXME: bug when height=0 https://github.com/o1-labs/nanobit/issues/365 *)
390
            let content_root_addr =
×
391
              funpow
392
                (MT.depth mt - height)
×
393
                (fun a -> Addr.child_exn ~ledger_depth a Direction.Left)
×
394
                (Addr.root ())
×
395
            in
396
            Either.First
×
397
              (Num_accounts
398
                 (len, MT.get_inner_hash_at_addr_exn mt content_root_addr) )
×
399
        | What_child_hashes (a, subtree_depth) -> (
×
400
            match subtree_depth with
401
            | n when n >= 1 -> (
×
402
                let subtree_depth =
403
                  min n ledger_sync_config.max_subtree_depth
404
                in
405
                let ledger_depth = MT.depth mt in
×
406
                let addresses =
×
407
                  intermediate_range ledger_depth a subtree_depth
408
                in
409
                match
×
410
                  Or_error.try_with (fun () ->
411
                      let get_hash a = MT.get_inner_hash_at_addr_exn mt a in
×
412
                      let hashes = Array.map addresses ~f:get_hash in
413
                      Answer.Child_hashes_are hashes )
×
414
                with
415
                | Ok answer ->
×
416
                    Either.First answer
417
                | Error e ->
×
418
                    [%log error]
×
419
                      ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
420
                      "When handling What_child_hashes request, the following \
421
                       error happended: $error" ;
422
                    Either.Second
×
423
                      ( Actions.Violated_protocol
424
                      , Some
425
                          ( "Invalid address in What_child_hashes request"
426
                          , [ ("addr", Addr.to_yojson a) ] ) ) )
×
427
            | _ ->
×
428
                [%log error]
×
429
                  "When handling What_child_hashes request, the depth was \
430
                   outside the valid range" ;
431
                Either.Second
×
432
                  ( Actions.Violated_protocol
433
                  , Some
434
                      ( "Invalid depth requested in What_child_hashes request"
435
                      , [ ("addr", Addr.to_yojson a) ] ) ) )
×
436
      in
437

438
      match response_or_punish with
439
      | Either.First answer ->
×
440
          Deferred.return @@ Ok answer
441
      | Either.Second action ->
×
442
          let%map _ =
443
            record_envelope_sender trust_system logger sender action
×
444
          in
445
          let err =
×
446
            Option.value_map ~default:"Violated protocol" (snd action) ~f:fst
×
447
          in
448
          Or_error.error_string err
×
449
  end
450

451
  type 'a t =
452
    { mutable desired_root : Root_hash.t option
453
    ; mutable auxiliary_data : 'a option
454
    ; tree : MT.t
455
    ; trust_system : Trust_system.t
456
    ; answers :
457
        (Root_hash.t * query * answer Envelope.Incoming.t) Linear_pipe.Reader.t
458
    ; answer_writer :
459
        (Root_hash.t * query * answer Envelope.Incoming.t) Linear_pipe.Writer.t
460
    ; queries : (Root_hash.t * query) Linear_pipe.Writer.t
461
    ; query_reader : (Root_hash.t * query) Linear_pipe.Reader.t
462
    ; waiting_parents : Hash.t Addr.Table.t
463
          (** Addresses we are waiting for the children of, and the expected
464
              hash of the node with the address. *)
465
    ; waiting_content : Hash.t Addr.Table.t
466
    ; mutable validity_listener :
467
        [ `Ok | `Target_changed of Root_hash.t option * Root_hash.t ] Ivar.t
468
    ; context : (module CONTEXT)
469
    }
470

471
  let t_of_sexp _ = failwith "t_of_sexp: not implemented"
×
472

473
  let sexp_of_t _ = failwith "sexp_of_t: not implemented"
×
474

475
  let desired_root_exn { desired_root; _ } = desired_root |> Option.value_exn
×
476

477
  let destroy t =
478
    Linear_pipe.close_read t.answers ;
×
479
    Linear_pipe.close_read t.query_reader
×
480

481
  let answer_writer t = t.answer_writer
×
482

483
  let query_reader t = t.query_reader
×
484

485
  let expect_children : 'a t -> Addr.t -> Hash.t -> unit =
486
   fun t parent_addr expected ->
487
    let open (val t.context) in
×
488
    [%log trace]
×
489
      ~metadata:
490
        [ ("parent_address", Addr.to_yojson parent_addr)
×
491
        ; ("hash", Hash.to_yojson expected)
×
492
        ]
493
      "Expecting children parent $parent_address, expected: $hash" ;
494
    Addr.Table.add_exn t.waiting_parents ~key:parent_addr ~data:expected
×
495

496
  let expect_content : 'a t -> Addr.t -> Hash.t -> unit =
497
   fun t addr expected ->
498
    let open (val t.context) in
×
499
    [%log trace]
×
500
      ~metadata:
501
        [ ("address", Addr.to_yojson addr); ("hash", Hash.to_yojson expected) ]
×
502
      "Expecting content addr $address, expected: $hash" ;
503
    Addr.Table.add_exn t.waiting_content ~key:addr ~data:expected
×
504

505
  (** Given an address and the accounts below that address, fill in the tree
506
      with them. *)
507
  let add_content :
508
         'a t
509
      -> Addr.t
510
      -> Account.t list
511
      -> [ `Success
512
         | `Hash_mismatch of Hash.t * Hash.t  (** expected hash, actual *) ] =
513
   fun t addr content ->
514
    let open (val t.context) in
×
515
    let expected = Addr.Table.find_exn t.waiting_content addr in
516
    (* TODO #444 should we batch all the updates and do them at the end? *)
517
    (* We might write the wrong data to the underlying ledger here, but if so
518
       we'll requeue the address and it'll be overwritten. *)
519
    MT.set_all_accounts_rooted_at_exn t.tree addr content ;
×
520
    Addr.Table.remove t.waiting_content addr ;
×
521
    [%log trace]
×
522
      ~metadata:
523
        [ ("address", Addr.to_yojson addr); ("hash", Hash.to_yojson expected) ]
×
524
      "Found content addr $address, with hash $hash, removing from waiting \
525
       content" ;
526
    let actual = MT.get_inner_hash_at_addr_exn t.tree addr in
×
527
    if Hash.equal actual expected then `Success
×
528
    else `Hash_mismatch (expected, actual)
×
529

530
  (* Merges each 2 contigous nodes, halving the size of the array *)
531
  let merge_siblings : Hash.t array -> index -> Hash.t array =
532
   fun nodes height ->
533
    let len = Array.length nodes in
×
534
    if len mod 2 <> 0 then failwith "length must be even" ;
×
535
    let half_len = len / 2 in
×
536
    let f i = Hash.merge ~height nodes.(2 * i) nodes.((2 * i) + 1) in
×
537
    Array.init half_len ~f
538

539
  (* Assumes nodes to be a power of 2 and merges them into their common root *)
540
  let rec merge_many : Hash.t array -> index -> Hash.t =
541
   fun nodes height ->
542
    let len = Array.length nodes in
×
543
    match len with
×
544
    | 1 ->
×
545
        nodes.(0)
546
    | _ ->
×
547
        let half = merge_siblings nodes height in
548
        merge_many half (height + 1)
×
549

550
  let merge_many : Hash.t array -> index -> index -> Hash.t =
551
   fun nodes height subtree_depth ->
552
    let bottom_height = height - subtree_depth in
×
553
    let hash = merge_many nodes bottom_height in
554
    hash
×
555

556
  (* Adds the subtree given as the 2^k subtree leaves with the given prefix address *)
557
  (* Returns next nodes to be checked *)
558
  let add_subtree :
559
         'a t
560
      -> Addr.t
561
      -> Hash.t array
562
      -> int
563
      -> [ `Good of (Addr.t * Hash.t) array
564
         | `Hash_mismatch of Hash.t * Hash.t
565
         | `Invalid_length ] =
566
   fun t addr nodes requested_depth ->
567
    let open (val t.context) in
×
568
    let len = Array.length nodes in
569
    let is_power = Int.is_pow2 len in
×
570
    let is_more_than_two = len >= 2 in
×
571
    let subtree_depth = Int.ceil_log2 len in
572
    let less_than_requested = subtree_depth <= requested_depth in
×
573
    let valid_length = is_power && is_more_than_two && less_than_requested in
×
574
    if valid_length then
575
      let ledger_depth = MT.depth t.tree in
×
576
      let expected =
×
577
        Option.value_exn ~message:"Forgot to wait for a node"
578
          (Addr.Table.find t.waiting_parents addr)
×
579
      in
580
      let merged =
×
581
        merge_many nodes (ledger_depth - Addr.depth addr) subtree_depth
×
582
      in
583
      if Hash.equal expected merged then (
×
584
        Addr.Table.remove t.waiting_parents addr ;
585
        let addresses = intermediate_range ledger_depth addr subtree_depth in
×
586
        let addresses_and_hashes = Array.zip_exn addresses nodes in
×
587

588
        (* Filter to fetch only those that differ *)
589
        let should_fetch_children addr hash =
×
590
          not @@ Hash.equal (MT.get_inner_hash_at_addr_exn t.tree addr) hash
×
591
        in
592
        let subtrees_to_fetch =
593
          addresses_and_hashes
594
          |> Array.filter ~f:(Tuple2.uncurry should_fetch_children)
×
595
        in
596
        `Good subtrees_to_fetch )
×
597
      else `Hash_mismatch (expected, merged)
×
598
    else `Invalid_length
×
599

600
  let all_done t =
601
    let open (val t.context) in
×
602
    if not (Root_hash.equal (MT.merkle_root t.tree) (desired_root_exn t)) then
×
603
      failwith "We finished syncing, but made a mistake somewhere :("
×
604
    else (
×
605
      if Ivar.is_full t.validity_listener then
606
        [%log error] "Ivar.fill bug is here!" ;
×
607
      Ivar.fill t.validity_listener `Ok )
×
608

609
  (** Compute the hash of an empty tree of the specified height. *)
610
  let empty_hash_at_height h =
611
    let rec go prev ctr =
×
612
      if ctr = h then prev else go (Hash.merge ~height:ctr prev prev) (ctr + 1)
×
613
    in
614
    go Hash.empty_account 0
615

616
  (** Given the hash of the smallest subtree that contains all accounts, the
617
      height of that hash in the tree and the height of the whole tree, compute
618
      the hash of the whole tree. *)
619
  let complete_with_empties hash start_height result_height =
620
    let rec go cur_empty prev_hash height =
×
621
      if height = result_height then prev_hash
×
622
      else
623
        let cur = Hash.merge ~height prev_hash cur_empty in
×
624
        let next_empty = Hash.merge ~height cur_empty cur_empty in
×
625
        go next_empty cur (height + 1)
×
626
    in
627
    go (empty_hash_at_height start_height) hash start_height
×
628

629
  (** Given an address and the hash of the corresponding subtree, start getting
630
      the children.
631
  *)
632
  let handle_node t addr exp_hash =
633
    let open (val t.context) in
×
634
    if Addr.depth addr >= MT.depth t.tree - account_subtree_height then (
×
635
      expect_content t addr exp_hash ;
636
      Linear_pipe.write_without_pushback_if_open t.queries
×
637
        (desired_root_exn t, What_contents addr) )
×
638
    else (
×
639
      expect_children t addr exp_hash ;
640
      Linear_pipe.write_without_pushback_if_open t.queries
×
641
        ( desired_root_exn t
×
642
        , What_child_hashes (addr, ledger_sync_config.default_subtree_depth) ) )
643

644
  (** Handle the initial Num_accounts message, starting the main syncing
645
      process. *)
646
  let handle_num_accounts :
647
      'a t -> int -> Hash.t -> [ `Success | `Hash_mismatch of Hash.t * Hash.t ]
648
      =
649
   fun t n content_hash ->
650
    let rh = Root_hash.to_hash (desired_root_exn t) in
×
651
    let height = Int.ceil_log2 n in
×
652
    (* FIXME: bug when height=0 https://github.com/o1-labs/nanobit/issues/365 *)
653
    let actual = complete_with_empties content_hash height (MT.depth t.tree) in
×
654
    if Hash.equal actual rh then (
×
655
      Addr.Table.clear t.waiting_parents ;
656
      (* We should use this information to set the empty account slots empty and
657
         start syncing at the content root. See #1972. *)
658
      Addr.Table.clear t.waiting_content ;
×
659
      handle_node t (Addr.root ()) rh ;
×
660
      `Success )
×
661
    else `Hash_mismatch (rh, actual)
×
662

663
  let main_loop t =
664
    let open (val t.context) in
×
665
    let handle_answer :
666
           Root_hash.t
667
           * Addr.t Query.t
668
           * (Hash.t, Account.t) Answer.t Envelope.Incoming.t
669
        -> unit Deferred.t =
670
     fun (root_hash, query, env) ->
671
      (* NOTE: think about synchronization here. This is deferred now, so
672
         the t and the underlying ledger can change while processing is
673
         happening. *)
674
      let already_done =
×
675
        match Ivar.peek t.validity_listener with Some `Ok -> true | _ -> false
×
676
      in
677
      let sender = Envelope.Incoming.sender env in
678
      let answer = Envelope.Incoming.data env in
×
679
      [%log trace]
×
680
        ~metadata:
681
          [ ("root_hash", Root_hash.to_yojson root_hash)
×
682
          ; ("query", Query.to_yojson Addr.to_yojson query)
×
683
          ]
684
        "Handle answer for $root_hash" ;
685
      if not (Root_hash.equal root_hash (desired_root_exn t)) then (
×
686
        [%log trace]
×
687
          ~metadata:
688
            [ ("desired_hash", Root_hash.to_yojson (desired_root_exn t))
×
689
            ; ("ignored_hash", Root_hash.to_yojson root_hash)
×
690
            ]
691
          "My desired root was $desired_hash, so I'm ignoring $ignored_hash" ;
692
        Deferred.unit )
×
693
      else if already_done then (
×
694
        (* This can happen if we asked for hashes that turn out to be equal in
695
           underlying ledger and the target. *)
696
        [%log debug] "Got sync response when we're already finished syncing" ;
×
697
        Deferred.unit )
×
698
      else
699
        let open Trust_system in
×
700
        (* If a peer misbehaves we still need the information we asked them for,
701
           so requeue in that case. *)
702
        let requeue_query () =
703
          Linear_pipe.write_without_pushback_if_open t.queries (root_hash, query)
×
704
        in
705
        let credit_fulfilled_request () =
706
          record_envelope_sender t.trust_system logger sender
×
707
            ( Actions.Fulfilled_request
708
            , Some
709
                ( "sync ledger query $query"
710
                , [ ("query", Query.to_yojson Addr.to_yojson query) ] ) )
×
711
        in
712
        let%bind _ =
713
          match (query, answer) with
714
          | Query.What_contents addr, Answer.Contents_are leaves -> (
×
715
              match add_content t addr leaves with
716
              | `Success ->
×
717
                  credit_fulfilled_request ()
×
718
              | `Hash_mismatch (expected, actual) ->
×
719
                  let%map () =
720
                    record_envelope_sender t.trust_system logger sender
×
721
                      ( Actions.Sent_bad_hash
722
                      , Some
723
                          ( "sent accounts $accounts for address $addr, they \
724
                             hash to $actual but we expected $expected"
725
                          , [ ( "accounts"
726
                              , `List (List.map ~f:Account.to_yojson leaves) )
×
727
                            ; ("addr", Addr.to_yojson addr)
×
728
                            ; ("actual", Hash.to_yojson actual)
×
729
                            ; ("expected", Hash.to_yojson expected)
×
730
                            ] ) )
731
                  in
732
                  requeue_query () )
×
733
          | Query.Num_accounts, Answer.Num_accounts (count, content_root) -> (
×
734
              match handle_num_accounts t count content_root with
735
              | `Success ->
×
736
                  credit_fulfilled_request ()
×
737
              | `Hash_mismatch (expected, actual) ->
×
738
                  let%map () =
739
                    record_envelope_sender t.trust_system logger sender
×
740
                      ( Actions.Sent_bad_hash
741
                      , Some
742
                          ( "Claimed num_accounts $count, content root hash \
743
                             $content_root_hash, that implies a root hash of \
744
                             $actual, we expected $expected"
745
                          , [ ("count", `Int count)
746
                            ; ("content_root_hash", Hash.to_yojson content_root)
×
747
                            ; ("actual", Hash.to_yojson actual)
×
748
                            ; ("expected", Hash.to_yojson expected)
×
749
                            ] ) )
750
                  in
751
                  requeue_query () )
×
752
          | ( Query.What_child_hashes (address, requested_depth)
×
753
            , Answer.Child_hashes_are hashes ) -> (
754
              match add_subtree t address hashes requested_depth with
755
              | `Hash_mismatch (expected, actual) ->
×
756
                  let%map () =
757
                    record_envelope_sender t.trust_system logger sender
×
758
                      ( Actions.Sent_bad_hash
759
                      , Some
760
                          ( "hashes sent for subtree on address $address merge \
761
                             to $actual_merge but we expected $expected_merge"
762
                          , [ ("actual_merge", Hash.to_yojson actual)
×
763
                            ; ("expected_merge", Hash.to_yojson expected)
×
764
                            ] ) )
765
                  in
766
                  requeue_query ()
×
767
              | `Invalid_length ->
×
768
                  let%map () =
769
                    record_envelope_sender t.trust_system logger sender
×
770
                      ( Actions.Sent_bad_hash
771
                      , Some
772
                          ( "hashes sent for subtree on address $address must \
773
                             be a power of 2 in the range 2-2^$depth"
774
                          , [ ( "depth"
775
                              , `Int ledger_sync_config.max_subtree_depth )
776
                            ] ) )
777
                  in
778
                  requeue_query ()
×
779
              | `Good children_to_verify ->
×
780
                  Array.iter children_to_verify ~f:(fun (addr, hash) ->
781
                      handle_node t addr hash ) ;
×
782
                  credit_fulfilled_request () )
×
783
          | query, answer ->
×
784
              let%map () =
785
                record_envelope_sender t.trust_system logger sender
×
786
                  ( Actions.Violated_protocol
787
                  , Some
788
                      ( "Answered question we didn't ask! Query was $query \
789
                         answer was $answer"
790
                      , [ ("query", Query.to_yojson Addr.to_yojson query)
×
791
                        ; ( "answer"
792
                          , Answer.to_yojson Hash.to_yojson Account.to_yojson
×
793
                              answer )
794
                        ] ) )
795
              in
796
              requeue_query ()
×
797
        in
798
        if
×
799
          Root_hash.equal
800
            (Option.value_exn t.desired_root)
×
801
            (MT.merkle_root t.tree)
×
802
        then (
×
803
          [%str_log trace] Snarked_ledger_synced ;
×
804
          all_done t ) ;
×
805
        Deferred.unit
×
806
    in
807
    Linear_pipe.iter t.answers ~f:handle_answer
808

809
  let new_goal t h ~data ~equal =
810
    let open (val t.context) in
×
811
    let should_skip =
812
      match t.desired_root with
813
      | None ->
×
814
          false
815
      | Some h' ->
×
816
          Root_hash.equal h h'
×
817
    in
818
    if not should_skip then (
×
819
      Option.iter t.desired_root ~f:(fun root_hash ->
820
          [%log debug]
×
821
            ~metadata:
822
              [ ("old_root_hash", Root_hash.to_yojson root_hash)
×
823
              ; ("new_root_hash", Root_hash.to_yojson h)
×
824
              ]
825
            "New_goal: changing target from $old_root_hash to $new_root_hash" ) ;
826
      Ivar.fill_if_empty t.validity_listener
×
827
        (`Target_changed (t.desired_root, h)) ;
828
      t.validity_listener <- Ivar.create () ;
×
829
      t.desired_root <- Some h ;
830
      t.auxiliary_data <- Some data ;
831
      Linear_pipe.write_without_pushback_if_open t.queries (h, Num_accounts) ;
832
      `New )
×
833
    else if
×
834
      Option.fold t.auxiliary_data ~init:false ~f:(fun _ saved_data ->
835
          equal data saved_data )
×
836
    then (
×
837
      [%log debug] "New_goal to same hash, not doing anything" ;
×
838
      `Repeat )
×
839
    else (
×
840
      t.auxiliary_data <- Some data ;
841
      `Update_data )
842

843
  let rec valid_tree t =
844
    match%bind Ivar.read t.validity_listener with
×
845
    | `Ok ->
×
846
        return (t.tree, Option.value_exn t.auxiliary_data)
×
847
    | `Target_changed _ ->
×
848
        valid_tree t
849

850
  let peek_valid_tree t =
851
    Option.bind (Ivar.peek t.validity_listener) ~f:(function
×
852
      | `Ok ->
×
853
          Some t.tree
854
      | `Target_changed _ ->
×
855
          None )
856

857
  let wait_until_valid t h =
858
    if not (Root_hash.equal h (desired_root_exn t)) then
×
859
      return (`Target_changed (t.desired_root, h))
×
860
    else
861
      Deferred.map (Ivar.read t.validity_listener) ~f:(function
×
862
        | `Target_changed payload ->
×
863
            `Target_changed payload
864
        | `Ok ->
×
865
            `Ok t.tree )
866

867
  let fetch t rh ~data ~equal =
868
    ignore (new_goal t rh ~data ~equal : [ `New | `Repeat | `Update_data ]) ;
×
869
    wait_until_valid t rh
870

871
  let create mt ~context ~trust_system =
872
    let qr, qw = Linear_pipe.create () in
×
873
    let ar, aw = Linear_pipe.create () in
×
874
    let t =
×
875
      { desired_root = None
876
      ; auxiliary_data = None
877
      ; tree = mt
878
      ; trust_system
879
      ; answers = ar
880
      ; answer_writer = aw
881
      ; queries = qw
882
      ; query_reader = qr
883
      ; waiting_parents = Addr.Table.create ()
×
884
      ; waiting_content = Addr.Table.create ()
×
885
      ; validity_listener = Ivar.create ()
×
886
      ; context
887
      }
888
    in
889
    don't_wait_for (main_loop t) ;
×
890
    t
×
891

892
  let apply_or_queue_diff _ _ =
893
    (* Need some interface for the diffs, not sure the layering is right here. *)
894
    failwith "todo"
×
895

896
  let merkle_path_at_addr _ = failwith "no"
×
897

898
  let get_account_at_addr _ = failwith "no"
×
899
end
40✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc