• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 242

04 Jun 2025 08:45PM UTC coverage: 36.779% (-19.8%) from 56.614%
242

push

buildkite

web-flow
Merge pull request #17086 from MinaProtocol/dkijania/port_terraform_removal

port terraform removal to develop

26262 of 71405 relevant lines covered (36.78%)

34615.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.11
/src/lib/syncable_ledger/syncable_ledger.ml
1
open Core_kernel
24✔
2
open Async_kernel
3
open Pipe_lib
4
open Network_peer
5

6
type Structured_log_events.t += Snarked_ledger_synced
7
  [@@deriving register_event { msg = "Snarked database sync'd. All done" }]
4✔
8

9
(** Run f recursively n times, starting with value r.
10
    e.g. funpow 3 f r = f (f (f r)) *)
11
let rec funpow n f r = if n > 0 then funpow (n - 1) f (f r) else r
8✔
12

13
module Query = struct
14
  [%%versioned
15
  module Stable = struct
16
    module V2 = struct
17
      type 'addr t =
33✔
18
        | What_child_hashes of 'addr * int
×
19
            (** What are the hashes of the children of this address? 
20
            If depth > 1 then we get the leaves of a subtree rooted
21
            at address and of the given depth. 
22
            For depth = 1 we have the simplest case with just the 2
23
            direct children.
24
            *)
25
        | What_contents of 'addr
24✔
26
            (** What accounts are at this address? addr must have depth
27
            tree_depth - account_subtree_height *)
28
        | Num_accounts
112✔
29
            (** How many accounts are there? Used to size data structure and
30
            figure out what part of the tree is filled in. *)
31
      [@@deriving sexp, yojson, hash, compare]
72✔
32
    end
33

34
    module V1 = struct
35
      type 'addr t =
24✔
36
        | What_child_hashes of 'addr
×
37
            (** What are the hashes of the children of this address? *)
38
        | What_contents of 'addr
×
39
            (** What accounts are at this address? addr must have depth
40
            tree_depth - account_subtree_height *)
41
        | Num_accounts
×
42
            (** How many accounts are there? Used to size data structure and
43
            figure out what part of the tree is filled in. *)
44
      [@@deriving sexp, yojson, hash, compare]
72✔
45

46
      let to_latest : 'a t -> 'a V2.t = function
47
        | What_child_hashes a ->
×
48
            What_child_hashes (a, 1)
49
        | What_contents a ->
×
50
            What_contents a
51
        | Num_accounts ->
×
52
            Num_accounts
53
    end
54
  end]
55
end
56

57
module Answer = struct
58
  [%%versioned
59
  module Stable = struct
60
    module V2 = struct
61
      type ('hash, 'account) t =
36✔
62
        | Child_hashes_are of 'hash Bounded_types.ArrayN4000.Stable.V1.t
×
63
            (** The requested addresses' children have these hashes.
64
            May be any power of 2 number of children, and not necessarily 
65
            immediate children  *)
66
        | Contents_are of 'account list
×
67
            (** The requested address has these accounts *)
68
        | Num_accounts of int * 'hash
×
69
            (** There are this many accounts and the smallest subtree that
70
                contains all non-empty nodes has this hash. *)
71
      [@@deriving sexp, yojson]
72✔
72
    end
73

74
    module V1 = struct
75
      type ('hash, 'account) t =
24✔
76
        | Child_hashes_are of 'hash * 'hash
×
77
            (** The requested address's children have these hashes **)
78
        | Contents_are of 'account list
×
79
            (** The requested address has these accounts *)
80
        | Num_accounts of int * 'hash
×
81
            (** There are this many accounts and the smallest subtree that
82
                contains all non-empty nodes has this hash. *)
83
      [@@deriving sexp, yojson]
72✔
84

85
      let to_latest acct_to_latest = function
86
        | Child_hashes_are (h1, h2) ->
×
87
            V2.Child_hashes_are [| h1; h2 |]
88
        | Contents_are accts ->
×
89
            V2.Contents_are (List.map ~f:acct_to_latest accts)
×
90
        | Num_accounts (i, h) ->
×
91
            V2.Num_accounts (i, h)
92

93
      (* Not a standard versioning function *)
94

95
      (** Attempts to downgrade v2 -> v1 *)
96
      let from_v2 : ('a, 'b) V2.t -> ('a, 'b) t Or_error.t = function
97
        | Child_hashes_are h ->
×
98
            if Array.length h = 2 then Ok (Child_hashes_are (h.(0), h.(1)))
×
99
            else Or_error.error_string "can't downgrade wide query"
×
100
        | Contents_are accs ->
×
101
            Ok (Contents_are accs)
102
        | Num_accounts (n, h) ->
×
103
            Ok (Num_accounts (n, h))
104
    end
105
  end]
106
end
107

108
type daemon_config = { max_subtree_depth : int; default_subtree_depth : int }
109

110
let create_config ~(compile_config : Mina_compile_config.t) ~max_subtree_depth
111
    ~default_subtree_depth () =
112
  { max_subtree_depth =
3✔
113
      Option.value ~default:compile_config.sync_ledger_max_subtree_depth
3✔
114
        max_subtree_depth
115
  ; default_subtree_depth =
116
      Option.value ~default:compile_config.sync_ledger_default_subtree_depth
3✔
117
        default_subtree_depth
118
  }
119

120
module type CONTEXT = sig
121
  val logger : Logger.t
122

123
  val ledger_sync_config : daemon_config
124
end
125

126
module type Inputs_intf = sig
127
  module Addr : module type of Merkle_address
128

129
  module Account : sig
130
    type t [@@deriving bin_io, sexp, yojson]
131
  end
132

133
  module Hash : Merkle_ledger.Intf.Hash with type account := Account.t
134

135
  module Root_hash : sig
136
    type t [@@deriving equal, sexp, yojson]
137

138
    val to_hash : t -> Hash.t
139
  end
140

141
  module MT :
142
    Merkle_ledger.Intf.SYNCABLE
143
      with type hash := Hash.t
144
       and type root_hash := Root_hash.t
145
       and type addr := Addr.t
146
       and type account := Account.t
147

148
  val account_subtree_height : int
149
end
150

151
module type S = sig
152
  type 'a t [@@deriving sexp]
153

154
  type merkle_tree
155

156
  type merkle_path
157

158
  type hash
159

160
  type root_hash
161

162
  type addr
163

164
  type diff
165

166
  type account
167

168
  type index = int
169

170
  type query
171

172
  type answer
173

174
  module Responder : sig
175
    type t
176

177
    val create :
178
         merkle_tree
179
      -> (query -> unit)
180
      -> context:(module CONTEXT)
181
      -> trust_system:Trust_system.t
182
      -> t
183

184
    val answer_query :
185
      t -> query Envelope.Incoming.t -> answer Or_error.t Deferred.t
186
  end
187

188
  val create :
189
       merkle_tree
190
    -> context:(module CONTEXT)
191
    -> trust_system:Trust_system.t
192
    -> 'a t
193

194
  val answer_writer :
195
       'a t
196
    -> (root_hash * query * answer Envelope.Incoming.t) Linear_pipe.Writer.t
197

198
  val query_reader : 'a t -> (root_hash * query) Linear_pipe.Reader.t
199

200
  val destroy : 'a t -> unit
201

202
  val new_goal :
203
       'a t
204
    -> root_hash
205
    -> data:'a
206
    -> equal:('a -> 'a -> bool)
207
    -> [ `Repeat | `New | `Update_data ]
208

209
  val peek_valid_tree : 'a t -> merkle_tree option
210

211
  val valid_tree : 'a t -> (merkle_tree * 'a) Deferred.t
212

213
  val wait_until_valid :
214
       'a t
215
    -> root_hash
216
    -> [ `Ok of merkle_tree | `Target_changed of root_hash option * root_hash ]
217
       Deferred.t
218

219
  val fetch :
220
       'a t
221
    -> root_hash
222
    -> data:'a
223
    -> equal:('a -> 'a -> bool)
224
    -> [ `Ok of merkle_tree | `Target_changed of root_hash option * root_hash ]
225
       Deferred.t
226

227
  val apply_or_queue_diff : 'a t -> diff -> unit
228

229
  val merkle_path_at_addr : 'a t -> addr -> merkle_path Or_error.t
230

231
  val get_account_at_addr : 'a t -> addr -> account Or_error.t
232
end
233

234
(*
235

236
Every node of the merkle tree is always in one of three states:
237

238
- Fresh.
239
  The current contents for this node in the MT match what we
240
  expect.
241
- Stale
242
  The current contents for this node in the MT do _not_ match
243
  what we expect.
244
- Unknown.
245
  We don't know what to expect yet.
246

247

248
Although every node conceptually has one of these states, and can
249
make a transition at any time, the syncer operates only along a
250
"frontier" of the tree, which consists of the deepest Stale nodes.
251

252
The goal of the ledger syncer is to make the root node be fresh,
253
starting from it being stale.
254

255
The syncer usually operates exclusively on these frontier nodes
256
and their direct children. However, the goal hash can change
257
while the syncer is running, and at that point every non-root node
258
conceptually becomes Unknown, and we need to restart. However, we
259
don't need to restart completely: in practice, only small portions
260
of the merkle tree change between goals, and we can re-use the "Stale"
261
nodes we already have if the expected hash doesn't change.
262

263
*)
264
(*
265
Note: while syncing, the underlying ledger is in an
266
indeterminate state. We're mutating hashes at internal
267
nodes without updating their children. In fact, we
268
don't even set all the hashes for the internal nodes!
269
(When we hit a height=N subtree, we don't do anything
270
with the hashes in the bottomost N-1 internal nodes).
271
*)
272

273
module Make (Inputs : Inputs_intf) : sig
274
  open Inputs
275

276
  include
277
    S
278
      with type merkle_tree := MT.t
279
       and type hash := Hash.t
280
       and type root_hash := Root_hash.t
281
       and type addr := Addr.t
282
       and type merkle_path := MT.path
283
       and type account := Account.t
284
       and type query := Addr.t Query.t
285
       and type answer := (Hash.t, Account.t) Answer.t
286
end = struct
287
  open Inputs
288

289
  type diff = unit
290

291
  type index = int
292

293
  type answer = (Hash.t, Account.t) Answer.t
294

295
  type query = Addr.t Query.t
296

297
  (* Provides addresses at an specific depth from this address *)
298
  let intermediate_range ledger_depth addr i =
299
    Array.init (1 lsl i) ~f:(fun idx ->
16✔
300
        Addr.extend_exn ~ledger_depth addr ~num_bits:i (Int64.of_int idx) )
128✔
301

302
  module Responder = struct
303
    type t =
304
      { mt : MT.t
305
      ; f : query -> unit
306
      ; context : (module CONTEXT)
307
      ; trust_system : Trust_system.t
308
      }
309

310
    let create :
311
           MT.t
312
        -> (query -> unit)
313
        -> context:(module CONTEXT)
314
        -> trust_system:Trust_system.t
315
        -> t =
316
     fun mt f ~context ~trust_system -> { mt; f; context; trust_system }
22✔
317

318
    let answer_query :
319
        t -> query Envelope.Incoming.t -> answer Or_error.t Deferred.t =
320
     fun { mt; f; context; trust_system } query_envelope ->
321
      let open (val context) in
22✔
322
      let open Trust_system in
323
      let ledger_depth = MT.depth mt in
324
      let sender = Envelope.Incoming.sender query_envelope in
22✔
325
      let query = Envelope.Incoming.data query_envelope in
22✔
326
      f query ;
22✔
327
      let response_or_punish =
22✔
328
        match query with
329
        | What_contents a ->
6✔
330
            if Addr.height ~ledger_depth a > account_subtree_height then
6✔
331
              Either.Second
×
332
                ( Actions.Violated_protocol
333
                , Some
334
                    ( "Requested too big of a subtree at once"
335
                    , [ ("addr", Addr.to_yojson a) ] ) )
×
336
            else
337
              let addresses_and_accounts =
6✔
338
                List.sort ~compare:(fun (addr1, _) (addr2, _) ->
339
                    Addr.compare addr1 addr2 )
102✔
340
                @@ MT.get_all_accounts_rooted_at_exn mt a
6✔
341
                (* can't actually throw *)
342
              in
343
              let addresses, accounts = List.unzip addresses_and_accounts in
6✔
344
              if List.is_empty addresses then
6✔
345
                (* Peer should know what portions of the tree are full from the
346
                   Num_accounts query. *)
347
                Either.Second
×
348
                  ( Actions.Violated_protocol
349
                  , Some
350
                      ("Requested empty subtree", [ ("addr", Addr.to_yojson a) ])
×
351
                  )
352
              else
353
                let first_address, rest_address =
6✔
354
                  (List.hd_exn addresses, List.tl_exn addresses)
6✔
355
                in
356
                let missing_address, is_compact =
357
                  List.fold rest_address
358
                    ~init:(Addr.next first_address, true)
6✔
359
                    ~f:(fun (expected_address, is_compact) actual_address ->
360
                      if
54✔
361
                        is_compact
362
                        && [%equal: Addr.t option] expected_address
54✔
363
                             (Some actual_address)
364
                      then (Addr.next actual_address, true)
54✔
365
                      else (expected_address, false) )
×
366
                in
367
                if not is_compact then (
×
368
                  (* indicates our ledger is invalid somehow. *)
369
                  [%log fatal]
×
370
                    ~metadata:
371
                      [ ( "missing_address"
372
                        , Addr.to_yojson (Option.value_exn missing_address) )
×
373
                      ; ( "addresses_and_accounts"
374
                        , `List
375
                            (List.map addresses_and_accounts
×
376
                               ~f:(fun (addr, account) ->
377
                                 `Tuple
×
378
                                   [ Addr.to_yojson addr
×
379
                                   ; Account.to_yojson account
×
380
                                   ] ) ) )
381
                      ]
382
                    "Missing an account at address: $missing_address inside \
383
                     the list: $addresses_and_accounts" ;
384
                  assert false )
×
385
                else Either.First (Answer.Contents_are accounts)
6✔
386
        | Num_accounts ->
8✔
387
            let len = MT.num_accounts mt in
388
            let height = Int.ceil_log2 len in
8✔
389
            (* FIXME: bug when height=0 https://github.com/o1-labs/nanobit/issues/365 *)
390
            let content_root_addr =
8✔
391
              funpow
392
                (MT.depth mt - height)
8✔
393
                (fun a ->
394
                  Addr.child_exn ~ledger_depth a Mina_stdlib.Direction.Left )
44✔
395
                (Addr.root ())
8✔
396
            in
397
            Either.First
8✔
398
              (Num_accounts
399
                 (len, MT.get_inner_hash_at_addr_exn mt content_root_addr) )
8✔
400
        | What_child_hashes (a, subtree_depth) -> (
8✔
401
            match subtree_depth with
402
            | n when n >= 1 -> (
8✔
403
                let subtree_depth =
404
                  min n ledger_sync_config.max_subtree_depth
405
                in
406
                let ledger_depth = MT.depth mt in
8✔
407
                let addresses =
8✔
408
                  intermediate_range ledger_depth a subtree_depth
409
                in
410
                match
8✔
411
                  Or_error.try_with (fun () ->
412
                      let get_hash a = MT.get_inner_hash_at_addr_exn mt a in
8✔
413
                      let hashes = Array.map addresses ~f:get_hash in
414
                      Answer.Child_hashes_are hashes )
8✔
415
                with
416
                | Ok answer ->
8✔
417
                    Either.First answer
418
                | Error e ->
×
419
                    [%log error]
×
420
                      ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
421
                      "When handling What_child_hashes request, the following \
422
                       error happended: $error" ;
423
                    Either.Second
×
424
                      ( Actions.Violated_protocol
425
                      , Some
426
                          ( "Invalid address in What_child_hashes request"
427
                          , [ ("addr", Addr.to_yojson a) ] ) ) )
×
428
            | _ ->
×
429
                [%log error]
×
430
                  "When handling What_child_hashes request, the depth was \
431
                   outside the valid range" ;
432
                Either.Second
×
433
                  ( Actions.Violated_protocol
434
                  , Some
435
                      ( "Invalid depth requested in What_child_hashes request"
436
                      , [ ("addr", Addr.to_yojson a) ] ) ) )
×
437
      in
438

439
      match response_or_punish with
440
      | Either.First answer ->
22✔
441
          Deferred.return @@ Ok answer
442
      | Either.Second action ->
×
443
          let%map _ =
444
            record_envelope_sender trust_system logger sender action
×
445
          in
446
          let err =
×
447
            Option.value_map ~default:"Violated protocol" (snd action) ~f:fst
×
448
          in
449
          Or_error.error_string err
×
450
  end
451

452
  type 'a t =
453
    { mutable desired_root : Root_hash.t option
454
    ; mutable auxiliary_data : 'a option
455
    ; tree : MT.t
456
    ; trust_system : Trust_system.t
457
    ; answers :
458
        (Root_hash.t * query * answer Envelope.Incoming.t) Linear_pipe.Reader.t
459
    ; answer_writer :
460
        (Root_hash.t * query * answer Envelope.Incoming.t) Linear_pipe.Writer.t
461
    ; queries : (Root_hash.t * query) Linear_pipe.Writer.t
462
    ; query_reader : (Root_hash.t * query) Linear_pipe.Reader.t
463
    ; waiting_parents : Hash.t Addr.Table.t
464
          (** Addresses we are waiting for the children of, and the expected
465
              hash of the node with the address. *)
466
    ; waiting_content : Hash.t Addr.Table.t
467
    ; mutable validity_listener :
468
        [ `Ok | `Target_changed of Root_hash.t option * Root_hash.t ] Ivar.t
469
    ; context : (module CONTEXT)
470
    }
471

472
  let t_of_sexp _ = failwith "t_of_sexp: not implemented"
×
473

474
  let sexp_of_t _ = failwith "sexp_of_t: not implemented"
×
475

476
  let desired_root_exn { desired_root; _ } = desired_root |> Option.value_exn
44✔
477

478
  let destroy t =
479
    Linear_pipe.close_read t.answers ;
×
480
    Linear_pipe.close_read t.query_reader
×
481

482
  let answer_writer t = t.answer_writer
4✔
483

484
  let query_reader t = t.query_reader
4✔
485

486
  let expect_children : 'a t -> Addr.t -> Hash.t -> unit =
487
   fun t parent_addr expected ->
488
    let open (val t.context) in
8✔
489
    [%log trace]
8✔
490
      ~metadata:
491
        [ ("parent_address", Addr.to_yojson parent_addr)
8✔
492
        ; ("hash", Hash.to_yojson expected)
8✔
493
        ]
494
      "Expecting children parent $parent_address, expected: $hash" ;
495
    Addr.Table.add_exn t.waiting_parents ~key:parent_addr ~data:expected
8✔
496

497
  let expect_content : 'a t -> Addr.t -> Hash.t -> unit =
498
   fun t addr expected ->
499
    let open (val t.context) in
6✔
500
    [%log trace]
6✔
501
      ~metadata:
502
        [ ("address", Addr.to_yojson addr); ("hash", Hash.to_yojson expected) ]
6✔
503
      "Expecting content addr $address, expected: $hash" ;
504
    Addr.Table.add_exn t.waiting_content ~key:addr ~data:expected
6✔
505

506
  (** Given an address and the accounts below that address, fill in the tree
507
      with them. *)
508
  let add_content :
509
         'a t
510
      -> Addr.t
511
      -> Account.t list
512
      -> [ `Success
513
         | `Hash_mismatch of Hash.t * Hash.t  (** expected hash, actual *) ] =
514
   fun t addr content ->
515
    let open (val t.context) in
6✔
516
    let expected = Addr.Table.find_exn t.waiting_content addr in
517
    (* TODO #444 should we batch all the updates and do them at the end? *)
518
    (* We might write the wrong data to the underlying ledger here, but if so
519
       we'll requeue the address and it'll be overwritten. *)
520
    MT.set_all_accounts_rooted_at_exn t.tree addr content ;
6✔
521
    Addr.Table.remove t.waiting_content addr ;
6✔
522
    [%log trace]
6✔
523
      ~metadata:
524
        [ ("address", Addr.to_yojson addr); ("hash", Hash.to_yojson expected) ]
6✔
525
      "Found content addr $address, with hash $hash, removing from waiting \
526
       content" ;
527
    let actual = MT.get_inner_hash_at_addr_exn t.tree addr in
6✔
528
    if Hash.equal actual expected then `Success
6✔
529
    else `Hash_mismatch (expected, actual)
×
530

531
  (* Merges each 2 contigous nodes, halving the size of the array *)
532
  let merge_siblings : Hash.t array -> index -> Hash.t array =
533
   fun nodes height ->
534
    let len = Array.length nodes in
24✔
535
    if len mod 2 <> 0 then failwith "length must be even" ;
×
536
    let half_len = len / 2 in
24✔
537
    let f i = Hash.merge ~height nodes.(2 * i) nodes.((2 * i) + 1) in
56✔
538
    Array.init half_len ~f
539

540
  (* Assumes nodes to be a power of 2 and merges them into their common root *)
541
  let rec merge_many : Hash.t array -> index -> Hash.t =
542
   fun nodes height ->
543
    let len = Array.length nodes in
32✔
544
    match len with
32✔
545
    | 1 ->
8✔
546
        nodes.(0)
547
    | _ ->
24✔
548
        let half = merge_siblings nodes height in
549
        merge_many half (height + 1)
24✔
550

551
  let merge_many : Hash.t array -> index -> index -> Hash.t =
552
   fun nodes height subtree_depth ->
553
    let bottom_height = height - subtree_depth in
8✔
554
    let hash = merge_many nodes bottom_height in
555
    hash
8✔
556

557
  (* Adds the subtree given as the 2^k subtree leaves with the given prefix address *)
558
  (* Returns next nodes to be checked *)
559
  let add_subtree :
560
         'a t
561
      -> Addr.t
562
      -> Hash.t array
563
      -> int
564
      -> [ `Good of (Addr.t * Hash.t) array
565
         | `Hash_mismatch of Hash.t * Hash.t
566
         | `Invalid_length ] =
567
   fun t addr nodes requested_depth ->
568
    let open (val t.context) in
8✔
569
    let len = Array.length nodes in
570
    let is_power = Int.is_pow2 len in
8✔
571
    let is_more_than_two = len >= 2 in
8✔
572
    let subtree_depth = Int.ceil_log2 len in
573
    let less_than_requested = subtree_depth <= requested_depth in
8✔
574
    let valid_length = is_power && is_more_than_two && less_than_requested in
8✔
575
    if valid_length then
576
      let ledger_depth = MT.depth t.tree in
8✔
577
      let expected =
8✔
578
        Option.value_exn ~message:"Forgot to wait for a node"
579
          (Addr.Table.find t.waiting_parents addr)
8✔
580
      in
581
      let merged =
8✔
582
        merge_many nodes (ledger_depth - Addr.depth addr) subtree_depth
8✔
583
      in
584
      if Hash.equal expected merged then (
8✔
585
        Addr.Table.remove t.waiting_parents addr ;
586
        let addresses = intermediate_range ledger_depth addr subtree_depth in
8✔
587
        let addresses_and_hashes = Array.zip_exn addresses nodes in
8✔
588

589
        (* Filter to fetch only those that differ *)
590
        let should_fetch_children addr hash =
8✔
591
          not @@ Hash.equal (MT.get_inner_hash_at_addr_exn t.tree addr) hash
64✔
592
        in
593
        let subtrees_to_fetch =
594
          addresses_and_hashes
595
          |> Array.filter ~f:(Tuple2.uncurry should_fetch_children)
8✔
596
        in
597
        `Good subtrees_to_fetch )
8✔
598
      else `Hash_mismatch (expected, merged)
×
599
    else `Invalid_length
×
600

601
  let all_done t =
602
    let open (val t.context) in
4✔
603
    if not (Root_hash.equal (MT.merkle_root t.tree) (desired_root_exn t)) then
4✔
604
      failwith "We finished syncing, but made a mistake somewhere :("
×
605
    else (
4✔
606
      if Ivar.is_full t.validity_listener then
607
        [%log error] "Ivar.fill bug is here!" ;
×
608
      Ivar.fill t.validity_listener `Ok )
4✔
609

610
  (** Compute the hash of an empty tree of the specified height. *)
611
  let empty_hash_at_height h =
612
    let rec go prev ctr =
4✔
613
      if ctr = h then prev else go (Hash.merge ~height:ctr prev prev) (ctr + 1)
4✔
614
    in
615
    go Hash.empty_account 0
616

617
  (** Given the hash of the smallest subtree that contains all accounts, the
618
      height of that hash in the tree and the height of the whole tree, compute
619
      the hash of the whole tree. *)
620
  let complete_with_empties hash start_height result_height =
621
    let rec go cur_empty prev_hash height =
4✔
622
      if height = result_height then prev_hash
4✔
623
      else
624
        let cur = Hash.merge ~height prev_hash cur_empty in
22✔
625
        let next_empty = Hash.merge ~height cur_empty cur_empty in
22✔
626
        go next_empty cur (height + 1)
22✔
627
    in
628
    go (empty_hash_at_height start_height) hash start_height
4✔
629

630
  (** Given an address and the hash of the corresponding subtree, start getting
631
      the children.
632
  *)
633
  let handle_node t addr exp_hash =
634
    let open (val t.context) in
14✔
635
    if Addr.depth addr >= MT.depth t.tree - account_subtree_height then (
6✔
636
      expect_content t addr exp_hash ;
637
      Linear_pipe.write_without_pushback_if_open t.queries
6✔
638
        (desired_root_exn t, What_contents addr) )
6✔
639
    else (
8✔
640
      expect_children t addr exp_hash ;
641
      Linear_pipe.write_without_pushback_if_open t.queries
8✔
642
        ( desired_root_exn t
8✔
643
        , What_child_hashes (addr, ledger_sync_config.default_subtree_depth) ) )
644

645
  (** Handle the initial Num_accounts message, starting the main syncing
646
      process. *)
647
  let handle_num_accounts :
648
      'a t -> int -> Hash.t -> [ `Success | `Hash_mismatch of Hash.t * Hash.t ]
649
      =
650
   fun t n content_hash ->
651
    let rh = Root_hash.to_hash (desired_root_exn t) in
4✔
652
    let height = Int.ceil_log2 n in
4✔
653
    (* FIXME: bug when height=0 https://github.com/o1-labs/nanobit/issues/365 *)
654
    let actual = complete_with_empties content_hash height (MT.depth t.tree) in
4✔
655
    if Hash.equal actual rh then (
4✔
656
      Addr.Table.clear t.waiting_parents ;
657
      (* We should use this information to set the empty account slots empty and
658
         start syncing at the content root. See #1972. *)
659
      Addr.Table.clear t.waiting_content ;
4✔
660
      handle_node t (Addr.root ()) rh ;
4✔
661
      `Success )
4✔
662
    else `Hash_mismatch (rh, actual)
×
663

664
  let main_loop t =
665
    let open (val t.context) in
4✔
666
    let handle_answer :
667
           Root_hash.t
668
           * Addr.t Query.t
669
           * (Hash.t, Account.t) Answer.t Envelope.Incoming.t
670
        -> unit Deferred.t =
671
     fun (root_hash, query, env) ->
672
      (* NOTE: think about synchronization here. This is deferred now, so
673
         the t and the underlying ledger can change while processing is
674
         happening. *)
675
      let already_done =
18✔
676
        match Ivar.peek t.validity_listener with Some `Ok -> true | _ -> false
×
677
      in
678
      let sender = Envelope.Incoming.sender env in
679
      let answer = Envelope.Incoming.data env in
18✔
680
      [%log trace]
18✔
681
        ~metadata:
682
          [ ("root_hash", Root_hash.to_yojson root_hash)
18✔
683
          ; ("query", Query.to_yojson Addr.to_yojson query)
18✔
684
          ]
685
        "Handle answer for $root_hash" ;
686
      if not (Root_hash.equal root_hash (desired_root_exn t)) then (
×
687
        [%log trace]
×
688
          ~metadata:
689
            [ ("desired_hash", Root_hash.to_yojson (desired_root_exn t))
×
690
            ; ("ignored_hash", Root_hash.to_yojson root_hash)
×
691
            ]
692
          "My desired root was $desired_hash, so I'm ignoring $ignored_hash" ;
693
        Deferred.unit )
×
694
      else if already_done then (
×
695
        (* This can happen if we asked for hashes that turn out to be equal in
696
           underlying ledger and the target. *)
697
        [%log debug] "Got sync response when we're already finished syncing" ;
×
698
        Deferred.unit )
×
699
      else
700
        let open Trust_system in
18✔
701
        (* If a peer misbehaves we still need the information we asked them for,
702
           so requeue in that case. *)
703
        let requeue_query () =
704
          Linear_pipe.write_without_pushback_if_open t.queries (root_hash, query)
×
705
        in
706
        let credit_fulfilled_request () =
707
          record_envelope_sender t.trust_system logger sender
18✔
708
            ( Actions.Fulfilled_request
709
            , Some
710
                ( "sync ledger query $query"
711
                , [ ("query", Query.to_yojson Addr.to_yojson query) ] ) )
18✔
712
        in
713
        let%bind _ =
714
          match (query, answer) with
715
          | Query.What_contents addr, Answer.Contents_are leaves -> (
6✔
716
              match add_content t addr leaves with
717
              | `Success ->
6✔
718
                  credit_fulfilled_request ()
6✔
719
              | `Hash_mismatch (expected, actual) ->
×
720
                  let%map () =
721
                    record_envelope_sender t.trust_system logger sender
×
722
                      ( Actions.Sent_bad_hash
723
                      , Some
724
                          ( "sent accounts $accounts for address $addr, they \
725
                             hash to $actual but we expected $expected"
726
                          , [ ( "accounts"
727
                              , `List (List.map ~f:Account.to_yojson leaves) )
×
728
                            ; ("addr", Addr.to_yojson addr)
×
729
                            ; ("actual", Hash.to_yojson actual)
×
730
                            ; ("expected", Hash.to_yojson expected)
×
731
                            ] ) )
732
                  in
733
                  requeue_query () )
×
734
          | Query.Num_accounts, Answer.Num_accounts (count, content_root) -> (
4✔
735
              match handle_num_accounts t count content_root with
736
              | `Success ->
4✔
737
                  credit_fulfilled_request ()
4✔
738
              | `Hash_mismatch (expected, actual) ->
×
739
                  let%map () =
740
                    record_envelope_sender t.trust_system logger sender
×
741
                      ( Actions.Sent_bad_hash
742
                      , Some
743
                          ( "Claimed num_accounts $count, content root hash \
744
                             $content_root_hash, that implies a root hash of \
745
                             $actual, we expected $expected"
746
                          , [ ("count", `Int count)
747
                            ; ("content_root_hash", Hash.to_yojson content_root)
×
748
                            ; ("actual", Hash.to_yojson actual)
×
749
                            ; ("expected", Hash.to_yojson expected)
×
750
                            ] ) )
751
                  in
752
                  requeue_query () )
×
753
          | ( Query.What_child_hashes (address, requested_depth)
8✔
754
            , Answer.Child_hashes_are hashes ) -> (
755
              match add_subtree t address hashes requested_depth with
756
              | `Hash_mismatch (expected, actual) ->
×
757
                  let%map () =
758
                    record_envelope_sender t.trust_system logger sender
×
759
                      ( Actions.Sent_bad_hash
760
                      , Some
761
                          ( "hashes sent for subtree on address $address merge \
762
                             to $actual_merge but we expected $expected_merge"
763
                          , [ ("actual_merge", Hash.to_yojson actual)
×
764
                            ; ("expected_merge", Hash.to_yojson expected)
×
765
                            ] ) )
766
                  in
767
                  requeue_query ()
×
768
              | `Invalid_length ->
×
769
                  let%map () =
770
                    record_envelope_sender t.trust_system logger sender
×
771
                      ( Actions.Sent_bad_hash
772
                      , Some
773
                          ( "hashes sent for subtree on address $address must \
774
                             be a power of 2 in the range 2-2^$depth"
775
                          , [ ( "depth"
776
                              , `Int ledger_sync_config.max_subtree_depth )
777
                            ] ) )
778
                  in
779
                  requeue_query ()
×
780
              | `Good children_to_verify ->
8✔
781
                  Array.iter children_to_verify ~f:(fun (addr, hash) ->
782
                      handle_node t addr hash ) ;
10✔
783
                  credit_fulfilled_request () )
8✔
784
          | query, answer ->
×
785
              let%map () =
786
                record_envelope_sender t.trust_system logger sender
×
787
                  ( Actions.Violated_protocol
788
                  , Some
789
                      ( "Answered question we didn't ask! Query was $query \
790
                         answer was $answer"
791
                      , [ ("query", Query.to_yojson Addr.to_yojson query)
×
792
                        ; ( "answer"
793
                          , Answer.to_yojson Hash.to_yojson Account.to_yojson
×
794
                              answer )
795
                        ] ) )
796
              in
797
              requeue_query ()
×
798
        in
799
        if
18✔
800
          Root_hash.equal
801
            (Option.value_exn t.desired_root)
18✔
802
            (MT.merkle_root t.tree)
18✔
803
        then (
4✔
804
          [%str_log trace] Snarked_ledger_synced ;
4✔
805
          all_done t ) ;
4✔
806
        Deferred.unit
18✔
807
    in
808
    Linear_pipe.iter t.answers ~f:handle_answer
809

810
  let new_goal t h ~data ~equal =
811
    let open (val t.context) in
4✔
812
    let should_skip =
813
      match t.desired_root with
814
      | None ->
4✔
815
          false
816
      | Some h' ->
×
817
          Root_hash.equal h h'
×
818
    in
819
    if not should_skip then (
4✔
820
      Option.iter t.desired_root ~f:(fun root_hash ->
821
          [%log debug]
×
822
            ~metadata:
823
              [ ("old_root_hash", Root_hash.to_yojson root_hash)
×
824
              ; ("new_root_hash", Root_hash.to_yojson h)
×
825
              ]
826
            "New_goal: changing target from $old_root_hash to $new_root_hash" ) ;
827
      Ivar.fill_if_empty t.validity_listener
4✔
828
        (`Target_changed (t.desired_root, h)) ;
829
      t.validity_listener <- Ivar.create () ;
4✔
830
      t.desired_root <- Some h ;
831
      t.auxiliary_data <- Some data ;
832
      Linear_pipe.write_without_pushback_if_open t.queries (h, Num_accounts) ;
833
      `New )
4✔
834
    else if
×
835
      Option.fold t.auxiliary_data ~init:false ~f:(fun _ saved_data ->
836
          equal data saved_data )
×
837
    then (
×
838
      [%log debug] "New_goal to same hash, not doing anything" ;
×
839
      `Repeat )
×
840
    else (
×
841
      t.auxiliary_data <- Some data ;
842
      `Update_data )
843

844
  let rec valid_tree t =
845
    match%bind Ivar.read t.validity_listener with
×
846
    | `Ok ->
×
847
        return (t.tree, Option.value_exn t.auxiliary_data)
×
848
    | `Target_changed _ ->
×
849
        valid_tree t
850

851
  let peek_valid_tree t =
852
    Option.bind (Ivar.peek t.validity_listener) ~f:(function
×
853
      | `Ok ->
×
854
          Some t.tree
855
      | `Target_changed _ ->
×
856
          None )
857

858
  let wait_until_valid t h =
859
    if not (Root_hash.equal h (desired_root_exn t)) then
4✔
860
      return (`Target_changed (t.desired_root, h))
×
861
    else
862
      Deferred.map (Ivar.read t.validity_listener) ~f:(function
4✔
863
        | `Target_changed payload ->
×
864
            `Target_changed payload
865
        | `Ok ->
4✔
866
            `Ok t.tree )
867

868
  let fetch t rh ~data ~equal =
869
    ignore (new_goal t rh ~data ~equal : [ `New | `Repeat | `Update_data ]) ;
4✔
870
    wait_until_valid t rh
871

872
  let create mt ~context ~trust_system =
873
    let qr, qw = Linear_pipe.create () in
4✔
874
    let ar, aw = Linear_pipe.create () in
4✔
875
    let t =
4✔
876
      { desired_root = None
877
      ; auxiliary_data = None
878
      ; tree = mt
879
      ; trust_system
880
      ; answers = ar
881
      ; answer_writer = aw
882
      ; queries = qw
883
      ; query_reader = qr
884
      ; waiting_parents = Addr.Table.create ()
4✔
885
      ; waiting_content = Addr.Table.create ()
4✔
886
      ; validity_listener = Ivar.create ()
4✔
887
      ; context
888
      }
889
    in
890
    don't_wait_for (main_loop t) ;
4✔
891
    t
4✔
892

893
  let apply_or_queue_diff _ _ =
894
    (* Need some interface for the diffs, not sure the layering is right here. *)
895
    failwith "todo"
×
896

897
  let merkle_path_at_addr _ = failwith "no"
×
898

899
  let get_account_at_addr _ = failwith "no"
×
900
end
48✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc