• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 411

24 Jul 2025 03:14PM UTC coverage: 33.188% (-27.7%) from 60.871%
411

push

buildkite

web-flow
Merge pull request #17541 from MinaProtocol/brian/merge-compatible-into-develop

Merge compatible into develop

164 of 702 new or added lines in 96 files covered. (23.36%)

18243 existing lines in 393 files now uncovered.

23983 of 72264 relevant lines covered (33.19%)

24667.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.42
/src/lib/syncable_ledger/syncable_ledger.ml
1
open Core_kernel
81✔
2
open Async_kernel
3
open Pipe_lib
4
open Network_peer
5

6
type Structured_log_events.t += Snarked_ledger_synced
UNCOV
7
  [@@deriving register_event { msg = "Snarked database sync'd. All done" }]
×
8

9
(** Run f recursively n times, starting with value r.
10
    e.g. funpow 3 f r = f (f (f r)) *)
UNCOV
11
let rec funpow n f r = if n > 0 then funpow (n - 1) f (f r) else r
×
12

13
module Query = struct
14
  [%%versioned
15
  module Stable = struct
16
    module V2 = struct
17
      type 'addr t =
81✔
18
        | What_child_hashes of 'addr * int
×
19
            (** What are the hashes of the children of this address? 
20
            If depth > 1 then we get the leaves of a subtree rooted
21
            at address and of the given depth. 
22
            For depth = 1 we have the simplest case with just the 2
23
            direct children.
24
            *)
UNCOV
25
        | What_contents of 'addr
×
26
            (** What accounts are at this address? addr must have depth
27
            tree_depth - account_subtree_height *)
UNCOV
28
        | Num_accounts
×
29
            (** How many accounts are there? Used to size data structure and
30
            figure out what part of the tree is filled in. *)
31
      [@@deriving sexp, yojson, hash, compare]
243✔
32
    end
33

34
    module V1 = struct
35
      type 'addr t =
81✔
36
        | What_child_hashes of 'addr
×
37
            (** What are the hashes of the children of this address? *)
38
        | What_contents of 'addr
×
39
            (** What accounts are at this address? addr must have depth
40
            tree_depth - account_subtree_height *)
41
        | Num_accounts
×
42
            (** How many accounts are there? Used to size data structure and
43
            figure out what part of the tree is filled in. *)
44
      [@@deriving sexp, yojson, hash, compare]
243✔
45

46
      let to_latest : 'a t -> 'a V2.t = function
47
        | What_child_hashes a ->
×
48
            What_child_hashes (a, 1)
49
        | What_contents a ->
×
50
            What_contents a
51
        | Num_accounts ->
×
52
            Num_accounts
53
    end
54
  end]
55
end
56

57
module Answer = struct
58
  [%%versioned
59
  module Stable = struct
60
    module V2 = struct
61
      type ('hash, 'account) t =
81✔
NEW
62
        | Child_hashes_are of
×
NEW
63
            'hash Mina_stdlib.Bounded_types.ArrayN4000.Stable.V1.t
×
64
            (** The requested addresses' children have these hashes.
65
            May be any power of 2 number of children, and not necessarily 
66
            immediate children  *)
67
        | Contents_are of 'account list
×
68
            (** The requested address has these accounts *)
69
        | Num_accounts of int * 'hash
×
70
            (** There are this many accounts and the smallest subtree that
71
                contains all non-empty nodes has this hash. *)
72
      [@@deriving sexp, yojson]
243✔
73
    end
74

75
    module V1 = struct
76
      type ('hash, 'account) t =
81✔
77
        | Child_hashes_are of 'hash * 'hash
×
78
            (** The requested address's children have these hashes **)
79
        | Contents_are of 'account list
×
80
            (** The requested address has these accounts *)
81
        | Num_accounts of int * 'hash
×
82
            (** There are this many accounts and the smallest subtree that
83
                contains all non-empty nodes has this hash. *)
84
      [@@deriving sexp, yojson]
243✔
85

86
      let to_latest acct_to_latest = function
87
        | Child_hashes_are (h1, h2) ->
×
88
            V2.Child_hashes_are [| h1; h2 |]
89
        | Contents_are accts ->
×
90
            V2.Contents_are (List.map ~f:acct_to_latest accts)
×
91
        | Num_accounts (i, h) ->
×
92
            V2.Num_accounts (i, h)
93

94
      (* Not a standard versioning function *)
95

96
      (** Attempts to downgrade v2 -> v1 *)
97
      let from_v2 : ('a, 'b) V2.t -> ('a, 'b) t Or_error.t = function
98
        | Child_hashes_are h ->
×
99
            if Array.length h = 2 then Ok (Child_hashes_are (h.(0), h.(1)))
×
100
            else Or_error.error_string "can't downgrade wide query"
×
101
        | Contents_are accs ->
×
102
            Ok (Contents_are accs)
103
        | Num_accounts (n, h) ->
×
104
            Ok (Num_accounts (n, h))
105
    end
106
  end]
107
end
108

109
type daemon_config = { max_subtree_depth : int; default_subtree_depth : int }
110

111
let create_config ~(compile_config : Mina_compile_config.t) ~max_subtree_depth
112
    ~default_subtree_depth () =
UNCOV
113
  { max_subtree_depth =
×
UNCOV
114
      Option.value ~default:compile_config.sync_ledger_max_subtree_depth
×
115
        max_subtree_depth
116
  ; default_subtree_depth =
UNCOV
117
      Option.value ~default:compile_config.sync_ledger_default_subtree_depth
×
118
        default_subtree_depth
119
  }
120

121
module type CONTEXT = sig
122
  val logger : Logger.t
123

124
  val ledger_sync_config : daemon_config
125
end
126

127
module type Inputs_intf = sig
128
  module Addr : module type of Merkle_address
129

130
  module Account : sig
131
    type t [@@deriving bin_io, sexp, yojson]
132
  end
133

134
  module Hash : Merkle_ledger.Intf.Hash with type account := Account.t
135

136
  module Root_hash : sig
137
    type t [@@deriving equal, sexp, yojson]
138

139
    val to_hash : t -> Hash.t
140
  end
141

142
  module MT :
143
    Merkle_ledger.Intf.SYNCABLE
144
      with type hash := Hash.t
145
       and type root_hash := Root_hash.t
146
       and type addr := Addr.t
147
       and type account := Account.t
148

149
  val account_subtree_height : int
150
end
151

152
module type S = sig
153
  type 'a t [@@deriving sexp]
154

155
  type merkle_tree
156

157
  type merkle_path
158

159
  type hash
160

161
  type root_hash
162

163
  type addr
164

165
  type diff
166

167
  type account
168

169
  type index = int
170

171
  type query
172

173
  type answer
174

175
  module Responder : sig
176
    type t
177

178
    val create :
179
         merkle_tree
180
      -> (query -> unit)
181
      -> context:(module CONTEXT)
182
      -> trust_system:Trust_system.t
183
      -> t
184

185
    val answer_query :
186
      t -> query Envelope.Incoming.t -> answer Or_error.t Deferred.t
187
  end
188

189
  val create :
190
       merkle_tree
191
    -> context:(module CONTEXT)
192
    -> trust_system:Trust_system.t
193
    -> 'a t
194

195
  val answer_writer :
196
       'a t
197
    -> (root_hash * query * answer Envelope.Incoming.t) Linear_pipe.Writer.t
198

199
  val query_reader : 'a t -> (root_hash * query) Linear_pipe.Reader.t
200

201
  val destroy : 'a t -> unit
202

203
  val new_goal :
204
       'a t
205
    -> root_hash
206
    -> data:'a
207
    -> equal:('a -> 'a -> bool)
208
    -> [ `Repeat | `New | `Update_data ]
209

210
  val peek_valid_tree : 'a t -> merkle_tree option
211

212
  val valid_tree : 'a t -> (merkle_tree * 'a) Deferred.t
213

214
  val wait_until_valid :
215
       'a t
216
    -> root_hash
217
    -> [ `Ok of merkle_tree | `Target_changed of root_hash option * root_hash ]
218
       Deferred.t
219

220
  val fetch :
221
       'a t
222
    -> root_hash
223
    -> data:'a
224
    -> equal:('a -> 'a -> bool)
225
    -> [ `Ok of merkle_tree | `Target_changed of root_hash option * root_hash ]
226
       Deferred.t
227

228
  val apply_or_queue_diff : 'a t -> diff -> unit
229

230
  val merkle_path_at_addr : 'a t -> addr -> merkle_path Or_error.t
231

232
  val get_account_at_addr : 'a t -> addr -> account Or_error.t
233
end
234

235
(*
236

237
Every node of the merkle tree is always in one of three states:
238

239
- Fresh.
240
  The current contents for this node in the MT match what we
241
  expect.
242
- Stale
243
  The current contents for this node in the MT do _not_ match
244
  what we expect.
245
- Unknown.
246
  We don't know what to expect yet.
247

248

249
Although every node conceptually has one of these states, and can
250
make a transition at any time, the syncer operates only along a
251
"frontier" of the tree, which consists of the deepest Stale nodes.
252

253
The goal of the ledger syncer is to make the root node be fresh,
254
starting from it being stale.
255

256
The syncer usually operates exclusively on these frontier nodes
257
and their direct children. However, the goal hash can change
258
while the syncer is running, and at that point every non-root node
259
conceptually becomes Unknown, and we need to restart. However, we
260
don't need to restart completely: in practice, only small portions
261
of the merkle tree change between goals, and we can re-use the "Stale"
262
nodes we already have if the expected hash doesn't change.
263

264
*)
265
(*
266
Note: while syncing, the underlying ledger is in an
267
indeterminate state. We're mutating hashes at internal
268
nodes without updating their children. In fact, we
269
don't even set all the hashes for the internal nodes!
270
(When we hit a height=N subtree, we don't do anything
271
with the hashes in the bottomost N-1 internal nodes).
272
*)
273

274
module Make (Inputs : Inputs_intf) : sig
275
  open Inputs
276

277
  include
278
    S
279
      with type merkle_tree := MT.t
280
       and type hash := Hash.t
281
       and type root_hash := Root_hash.t
282
       and type addr := Addr.t
283
       and type merkle_path := MT.path
284
       and type account := Account.t
285
       and type query := Addr.t Query.t
286
       and type answer := (Hash.t, Account.t) Answer.t
287
end = struct
288
  open Inputs
289

290
  type diff = unit
291

292
  type index = int
293

294
  type answer = (Hash.t, Account.t) Answer.t
295

296
  type query = Addr.t Query.t
297

298
  (* Provides addresses at an specific depth from this address *)
299
  let intermediate_range ledger_depth addr i =
UNCOV
300
    Array.init (1 lsl i) ~f:(fun idx ->
×
UNCOV
301
        Addr.extend_exn ~ledger_depth addr ~num_bits:i (Int64.of_int idx) )
×
302

303
  module Responder = struct
304
    type t =
305
      { mt : MT.t
306
      ; f : query -> unit
307
      ; context : (module CONTEXT)
308
      ; trust_system : Trust_system.t
309
      }
310

311
    let create :
312
           MT.t
313
        -> (query -> unit)
314
        -> context:(module CONTEXT)
315
        -> trust_system:Trust_system.t
316
        -> t =
UNCOV
317
     fun mt f ~context ~trust_system -> { mt; f; context; trust_system }
×
318

319
    let answer_query :
320
        t -> query Envelope.Incoming.t -> answer Or_error.t Deferred.t =
321
     fun { mt; f; context; trust_system } query_envelope ->
UNCOV
322
      let open (val context) in
×
323
      let open Trust_system in
324
      let ledger_depth = MT.depth mt in
UNCOV
325
      let sender = Envelope.Incoming.sender query_envelope in
×
UNCOV
326
      let query = Envelope.Incoming.data query_envelope in
×
UNCOV
327
      f query ;
×
UNCOV
328
      let response_or_punish =
×
329
        match query with
UNCOV
330
        | What_contents a ->
×
UNCOV
331
            if Addr.height ~ledger_depth a > account_subtree_height then
×
332
              Either.Second
×
333
                ( Actions.Violated_protocol
334
                , Some
335
                    ( "Requested too big of a subtree at once"
336
                    , [ ("addr", Addr.to_yojson a) ] ) )
×
337
            else
UNCOV
338
              let addresses_and_accounts =
×
339
                List.sort ~compare:(fun (addr1, _) (addr2, _) ->
UNCOV
340
                    Addr.compare addr1 addr2 )
×
UNCOV
341
                @@ MT.get_all_accounts_rooted_at_exn mt a
×
342
                (* can't actually throw *)
343
              in
UNCOV
344
              let addresses, accounts = List.unzip addresses_and_accounts in
×
UNCOV
345
              if List.is_empty addresses then
×
346
                (* Peer should know what portions of the tree are full from the
347
                   Num_accounts query. *)
348
                Either.Second
×
349
                  ( Actions.Violated_protocol
350
                  , Some
351
                      ("Requested empty subtree", [ ("addr", Addr.to_yojson a) ])
×
352
                  )
353
              else
UNCOV
354
                let first_address, rest_address =
×
UNCOV
355
                  (List.hd_exn addresses, List.tl_exn addresses)
×
356
                in
357
                let missing_address, is_compact =
358
                  List.fold rest_address
UNCOV
359
                    ~init:(Addr.next first_address, true)
×
360
                    ~f:(fun (expected_address, is_compact) actual_address ->
UNCOV
361
                      if
×
362
                        is_compact
UNCOV
363
                        && [%equal: Addr.t option] expected_address
×
364
                             (Some actual_address)
UNCOV
365
                      then (Addr.next actual_address, true)
×
366
                      else (expected_address, false) )
×
367
                in
368
                if not is_compact then (
×
369
                  (* indicates our ledger is invalid somehow. *)
370
                  [%log fatal]
×
371
                    ~metadata:
372
                      [ ( "missing_address"
373
                        , Addr.to_yojson (Option.value_exn missing_address) )
×
374
                      ; ( "addresses_and_accounts"
375
                        , `List
376
                            (List.map addresses_and_accounts
×
377
                               ~f:(fun (addr, account) ->
378
                                 `Tuple
×
379
                                   [ Addr.to_yojson addr
×
380
                                   ; Account.to_yojson account
×
381
                                   ] ) ) )
382
                      ]
383
                    "Missing an account at address: $missing_address inside \
384
                     the list: $addresses_and_accounts" ;
385
                  assert false )
×
UNCOV
386
                else Either.First (Answer.Contents_are accounts)
×
UNCOV
387
        | Num_accounts ->
×
388
            let len = MT.num_accounts mt in
UNCOV
389
            let height = Int.ceil_log2 len in
×
390
            (* FIXME: bug when height=0 https://github.com/o1-labs/nanobit/issues/365 *)
UNCOV
391
            let content_root_addr =
×
392
              funpow
UNCOV
393
                (MT.depth mt - height)
×
394
                (fun a ->
UNCOV
395
                  Addr.child_exn ~ledger_depth a Mina_stdlib.Direction.Left )
×
UNCOV
396
                (Addr.root ())
×
397
            in
UNCOV
398
            Either.First
×
399
              (Num_accounts
UNCOV
400
                 (len, MT.get_inner_hash_at_addr_exn mt content_root_addr) )
×
UNCOV
401
        | What_child_hashes (a, subtree_depth) -> (
×
402
            match subtree_depth with
UNCOV
403
            | n when n >= 1 -> (
×
404
                let subtree_depth =
405
                  min n ledger_sync_config.max_subtree_depth
406
                in
UNCOV
407
                let ledger_depth = MT.depth mt in
×
UNCOV
408
                let addresses =
×
409
                  intermediate_range ledger_depth a subtree_depth
410
                in
UNCOV
411
                match
×
412
                  Or_error.try_with (fun () ->
UNCOV
413
                      let get_hash a = MT.get_inner_hash_at_addr_exn mt a in
×
414
                      let hashes = Array.map addresses ~f:get_hash in
UNCOV
415
                      Answer.Child_hashes_are hashes )
×
416
                with
UNCOV
417
                | Ok answer ->
×
418
                    Either.First answer
419
                | Error e ->
×
420
                    [%log error]
×
421
                      ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
422
                      "When handling What_child_hashes request, the following \
423
                       error happended: $error" ;
424
                    Either.Second
×
425
                      ( Actions.Violated_protocol
426
                      , Some
427
                          ( "Invalid address in What_child_hashes request"
428
                          , [ ("addr", Addr.to_yojson a) ] ) ) )
×
UNCOV
429
            | _ ->
×
UNCOV
430
                [%log error]
×
431
                  "When handling What_child_hashes request, the depth was \
432
                   outside the valid range" ;
UNCOV
433
                Either.Second
×
434
                  ( Actions.Violated_protocol
435
                  , Some
436
                      ( "Invalid depth requested in What_child_hashes request"
UNCOV
437
                      , [ ("addr", Addr.to_yojson a) ] ) ) )
×
438
      in
439

440
      match response_or_punish with
UNCOV
441
      | Either.First answer ->
×
442
          Deferred.return @@ Ok answer
UNCOV
443
      | Either.Second action ->
×
444
          let%map _ =
UNCOV
445
            record_envelope_sender trust_system logger sender action
×
446
          in
UNCOV
447
          let err =
×
UNCOV
448
            Option.value_map ~default:"Violated protocol" (snd action) ~f:fst
×
449
          in
UNCOV
450
          Or_error.error_string err
×
451
  end
452

453
  type 'a t =
454
    { mutable desired_root : Root_hash.t option
455
    ; mutable auxiliary_data : 'a option
456
    ; tree : MT.t
457
    ; trust_system : Trust_system.t
458
    ; answers :
459
        (Root_hash.t * query * answer Envelope.Incoming.t) Linear_pipe.Reader.t
460
    ; answer_writer :
461
        (Root_hash.t * query * answer Envelope.Incoming.t) Linear_pipe.Writer.t
462
    ; queries : (Root_hash.t * query) Linear_pipe.Writer.t
463
    ; query_reader : (Root_hash.t * query) Linear_pipe.Reader.t
464
    ; waiting_parents : Hash.t Addr.Table.t
465
          (** Addresses we are waiting for the children of, and the expected
466
              hash of the node with the address. *)
467
    ; waiting_content : Hash.t Addr.Table.t
468
    ; mutable validity_listener :
469
        [ `Ok | `Target_changed of Root_hash.t option * Root_hash.t ] Ivar.t
470
    ; context : (module CONTEXT)
471
    }
472

473
  let t_of_sexp _ = failwith "t_of_sexp: not implemented"
×
474

475
  let sexp_of_t _ = failwith "sexp_of_t: not implemented"
×
476

UNCOV
477
  let desired_root_exn { desired_root; _ } = desired_root |> Option.value_exn
×
478

479
  let destroy t =
UNCOV
480
    Linear_pipe.close_read t.answers ;
×
UNCOV
481
    Linear_pipe.close_read t.query_reader
×
482

UNCOV
483
  let answer_writer t = t.answer_writer
×
484

UNCOV
485
  let query_reader t = t.query_reader
×
486

487
  let expect_children : 'a t -> Addr.t -> Hash.t -> unit =
488
   fun t parent_addr expected ->
UNCOV
489
    let open (val t.context) in
×
UNCOV
490
    [%log trace]
×
491
      ~metadata:
UNCOV
492
        [ ("parent_address", Addr.to_yojson parent_addr)
×
UNCOV
493
        ; ("hash", Hash.to_yojson expected)
×
494
        ]
495
      "Expecting children parent $parent_address, expected: $hash" ;
UNCOV
496
    Addr.Table.add_exn t.waiting_parents ~key:parent_addr ~data:expected
×
497

498
  let expect_content : 'a t -> Addr.t -> Hash.t -> unit =
499
   fun t addr expected ->
UNCOV
500
    let open (val t.context) in
×
UNCOV
501
    [%log trace]
×
502
      ~metadata:
UNCOV
503
        [ ("address", Addr.to_yojson addr); ("hash", Hash.to_yojson expected) ]
×
504
      "Expecting content addr $address, expected: $hash" ;
UNCOV
505
    Addr.Table.add_exn t.waiting_content ~key:addr ~data:expected
×
506

507
  (** Given an address and the accounts below that address, fill in the tree
508
      with them. *)
509
  let add_content :
510
         'a t
511
      -> Addr.t
512
      -> Account.t list
513
      -> [ `Success
514
         | `Hash_mismatch of Hash.t * Hash.t  (** expected hash, actual *) ] =
515
   fun t addr content ->
UNCOV
516
    let open (val t.context) in
×
517
    let expected = Addr.Table.find_exn t.waiting_content addr in
518
    (* TODO #444 should we batch all the updates and do them at the end? *)
519
    (* We might write the wrong data to the underlying ledger here, but if so
520
       we'll requeue the address and it'll be overwritten. *)
UNCOV
521
    MT.set_all_accounts_rooted_at_exn t.tree addr content ;
×
UNCOV
522
    Addr.Table.remove t.waiting_content addr ;
×
UNCOV
523
    [%log trace]
×
524
      ~metadata:
UNCOV
525
        [ ("address", Addr.to_yojson addr); ("hash", Hash.to_yojson expected) ]
×
526
      "Found content addr $address, with hash $hash, removing from waiting \
527
       content" ;
UNCOV
528
    let actual = MT.get_inner_hash_at_addr_exn t.tree addr in
×
UNCOV
529
    if Hash.equal actual expected then `Success
×
530
    else `Hash_mismatch (expected, actual)
×
531

532
  (* Merges each 2 contigous nodes, halving the size of the array *)
533
  let merge_siblings : Hash.t array -> index -> Hash.t array =
534
   fun nodes height ->
UNCOV
535
    let len = Array.length nodes in
×
536
    if len mod 2 <> 0 then failwith "length must be even" ;
×
UNCOV
537
    let half_len = len / 2 in
×
UNCOV
538
    let f i = Hash.merge ~height nodes.(2 * i) nodes.((2 * i) + 1) in
×
539
    Array.init half_len ~f
540

541
  (* Assumes nodes to be a power of 2 and merges them into their common root *)
542
  let rec merge_many : Hash.t array -> index -> Hash.t =
543
   fun nodes height ->
UNCOV
544
    let len = Array.length nodes in
×
UNCOV
545
    match len with
×
UNCOV
546
    | 1 ->
×
547
        nodes.(0)
UNCOV
548
    | _ ->
×
549
        let half = merge_siblings nodes height in
UNCOV
550
        merge_many half (height + 1)
×
551

552
  let merge_many : Hash.t array -> index -> index -> Hash.t =
553
   fun nodes height subtree_depth ->
UNCOV
554
    let bottom_height = height - subtree_depth in
×
555
    let hash = merge_many nodes bottom_height in
UNCOV
556
    hash
×
557

558
  (* Adds the subtree given as the 2^k subtree leaves with the given prefix address *)
559
  (* Returns next nodes to be checked *)
560
  let add_subtree :
561
         'a t
562
      -> Addr.t
563
      -> Hash.t array
564
      -> int
565
      -> [ `Good of (Addr.t * Hash.t) array
566
         | `Hash_mismatch of Hash.t * Hash.t
567
         | `Invalid_length ] =
568
   fun t addr nodes requested_depth ->
UNCOV
569
    let open (val t.context) in
×
570
    let len = Array.length nodes in
UNCOV
571
    let is_power = Int.is_pow2 len in
×
UNCOV
572
    let is_more_than_two = len >= 2 in
×
573
    let subtree_depth = Int.ceil_log2 len in
UNCOV
574
    let less_than_requested = subtree_depth <= requested_depth in
×
UNCOV
575
    let valid_length = is_power && is_more_than_two && less_than_requested in
×
576
    if valid_length then
UNCOV
577
      let ledger_depth = MT.depth t.tree in
×
UNCOV
578
      let expected =
×
579
        Option.value_exn ~message:"Forgot to wait for a node"
UNCOV
580
          (Addr.Table.find t.waiting_parents addr)
×
581
      in
UNCOV
582
      let merged =
×
UNCOV
583
        merge_many nodes (ledger_depth - Addr.depth addr) subtree_depth
×
584
      in
UNCOV
585
      if Hash.equal expected merged then (
×
586
        Addr.Table.remove t.waiting_parents addr ;
UNCOV
587
        let addresses = intermediate_range ledger_depth addr subtree_depth in
×
UNCOV
588
        let addresses_and_hashes = Array.zip_exn addresses nodes in
×
589

590
        (* Filter to fetch only those that differ *)
UNCOV
591
        let should_fetch_children addr hash =
×
UNCOV
592
          not @@ Hash.equal (MT.get_inner_hash_at_addr_exn t.tree addr) hash
×
593
        in
594
        let subtrees_to_fetch =
595
          addresses_and_hashes
UNCOV
596
          |> Array.filter ~f:(Tuple2.uncurry should_fetch_children)
×
597
        in
UNCOV
598
        `Good subtrees_to_fetch )
×
599
      else `Hash_mismatch (expected, merged)
×
600
    else `Invalid_length
×
601

602
  let all_done t =
UNCOV
603
    let open (val t.context) in
×
UNCOV
604
    if not (Root_hash.equal (MT.merkle_root t.tree) (desired_root_exn t)) then
×
605
      failwith "We finished syncing, but made a mistake somewhere :("
×
UNCOV
606
    else (
×
607
      if Ivar.is_full t.validity_listener then
608
        [%log error] "Ivar.fill bug is here!" ;
×
UNCOV
609
      Ivar.fill t.validity_listener `Ok )
×
610

611
  (** Compute the hash of an empty tree of the specified height. *)
612
  let empty_hash_at_height h =
UNCOV
613
    let rec go prev ctr =
×
UNCOV
614
      if ctr = h then prev else go (Hash.merge ~height:ctr prev prev) (ctr + 1)
×
615
    in
616
    go Hash.empty_account 0
617

618
  (** Given the hash of the smallest subtree that contains all accounts, the
619
      height of that hash in the tree and the height of the whole tree, compute
620
      the hash of the whole tree. *)
621
  let complete_with_empties hash start_height result_height =
UNCOV
622
    let rec go cur_empty prev_hash height =
×
UNCOV
623
      if height = result_height then prev_hash
×
624
      else
UNCOV
625
        let cur = Hash.merge ~height prev_hash cur_empty in
×
UNCOV
626
        let next_empty = Hash.merge ~height cur_empty cur_empty in
×
UNCOV
627
        go next_empty cur (height + 1)
×
628
    in
UNCOV
629
    go (empty_hash_at_height start_height) hash start_height
×
630

631
  (** Given an address and the hash of the corresponding subtree, start getting
632
      the children.
633
  *)
634
  let handle_node t addr exp_hash =
UNCOV
635
    let open (val t.context) in
×
UNCOV
636
    if Addr.depth addr >= MT.depth t.tree - account_subtree_height then (
×
637
      expect_content t addr exp_hash ;
UNCOV
638
      Linear_pipe.write_without_pushback_if_open t.queries
×
UNCOV
639
        (desired_root_exn t, What_contents addr) )
×
UNCOV
640
    else (
×
641
      expect_children t addr exp_hash ;
UNCOV
642
      Linear_pipe.write_without_pushback_if_open t.queries
×
UNCOV
643
        ( desired_root_exn t
×
644
        , What_child_hashes (addr, ledger_sync_config.default_subtree_depth) ) )
645

646
  (** Handle the initial Num_accounts message, starting the main syncing
647
      process. *)
648
  let handle_num_accounts :
649
      'a t -> int -> Hash.t -> [ `Success | `Hash_mismatch of Hash.t * Hash.t ]
650
      =
651
   fun t n content_hash ->
UNCOV
652
    let rh = Root_hash.to_hash (desired_root_exn t) in
×
UNCOV
653
    let height = Int.ceil_log2 n in
×
654
    (* FIXME: bug when height=0 https://github.com/o1-labs/nanobit/issues/365 *)
UNCOV
655
    let actual = complete_with_empties content_hash height (MT.depth t.tree) in
×
UNCOV
656
    if Hash.equal actual rh then (
×
657
      Addr.Table.clear t.waiting_parents ;
658
      (* We should use this information to set the empty account slots empty and
659
         start syncing at the content root. See #1972. *)
UNCOV
660
      Addr.Table.clear t.waiting_content ;
×
UNCOV
661
      handle_node t (Addr.root ()) rh ;
×
UNCOV
662
      `Success )
×
663
    else `Hash_mismatch (rh, actual)
×
664

665
  let main_loop t =
UNCOV
666
    let open (val t.context) in
×
667
    let handle_answer :
668
           Root_hash.t
669
           * Addr.t Query.t
670
           * (Hash.t, Account.t) Answer.t Envelope.Incoming.t
671
        -> unit Deferred.t =
672
     fun (root_hash, query, env) ->
673
      (* NOTE: think about synchronization here. This is deferred now, so
674
         the t and the underlying ledger can change while processing is
675
         happening. *)
UNCOV
676
      let already_done =
×
677
        match Ivar.peek t.validity_listener with Some `Ok -> true | _ -> false
×
678
      in
679
      let sender = Envelope.Incoming.sender env in
UNCOV
680
      let answer = Envelope.Incoming.data env in
×
UNCOV
681
      [%log trace]
×
682
        ~metadata:
UNCOV
683
          [ ("root_hash", Root_hash.to_yojson root_hash)
×
UNCOV
684
          ; ("query", Query.to_yojson Addr.to_yojson query)
×
685
          ]
686
        "Handle answer for $root_hash" ;
687
      if not (Root_hash.equal root_hash (desired_root_exn t)) then (
×
688
        [%log trace]
×
689
          ~metadata:
690
            [ ("desired_hash", Root_hash.to_yojson (desired_root_exn t))
×
691
            ; ("ignored_hash", Root_hash.to_yojson root_hash)
×
692
            ]
693
          "My desired root was $desired_hash, so I'm ignoring $ignored_hash" ;
694
        Deferred.unit )
×
695
      else if already_done then (
×
696
        (* This can happen if we asked for hashes that turn out to be equal in
697
           underlying ledger and the target. *)
698
        [%log debug] "Got sync response when we're already finished syncing" ;
×
699
        Deferred.unit )
×
700
      else
UNCOV
701
        let open Trust_system in
×
702
        (* If a peer misbehaves we still need the information we asked them for,
703
           so requeue in that case. *)
704
        let requeue_query () =
705
          Linear_pipe.write_without_pushback_if_open t.queries (root_hash, query)
×
706
        in
707
        let credit_fulfilled_request () =
UNCOV
708
          record_envelope_sender t.trust_system logger sender
×
709
            ( Actions.Fulfilled_request
710
            , Some
711
                ( "sync ledger query $query"
UNCOV
712
                , [ ("query", Query.to_yojson Addr.to_yojson query) ] ) )
×
713
        in
714
        let%bind _ =
715
          match (query, answer) with
UNCOV
716
          | Query.What_contents addr, Answer.Contents_are leaves -> (
×
717
              match add_content t addr leaves with
UNCOV
718
              | `Success ->
×
UNCOV
719
                  credit_fulfilled_request ()
×
720
              | `Hash_mismatch (expected, actual) ->
×
721
                  let%map () =
722
                    record_envelope_sender t.trust_system logger sender
×
723
                      ( Actions.Sent_bad_hash
724
                      , Some
725
                          ( "sent accounts $accounts for address $addr, they \
726
                             hash to $actual but we expected $expected"
727
                          , [ ( "accounts"
728
                              , `List (List.map ~f:Account.to_yojson leaves) )
×
729
                            ; ("addr", Addr.to_yojson addr)
×
730
                            ; ("actual", Hash.to_yojson actual)
×
731
                            ; ("expected", Hash.to_yojson expected)
×
732
                            ] ) )
733
                  in
734
                  requeue_query () )
×
UNCOV
735
          | Query.Num_accounts, Answer.Num_accounts (count, content_root) -> (
×
736
              match handle_num_accounts t count content_root with
UNCOV
737
              | `Success ->
×
UNCOV
738
                  credit_fulfilled_request ()
×
739
              | `Hash_mismatch (expected, actual) ->
×
740
                  let%map () =
741
                    record_envelope_sender t.trust_system logger sender
×
742
                      ( Actions.Sent_bad_hash
743
                      , Some
744
                          ( "Claimed num_accounts $count, content root hash \
745
                             $content_root_hash, that implies a root hash of \
746
                             $actual, we expected $expected"
747
                          , [ ("count", `Int count)
748
                            ; ("content_root_hash", Hash.to_yojson content_root)
×
749
                            ; ("actual", Hash.to_yojson actual)
×
750
                            ; ("expected", Hash.to_yojson expected)
×
751
                            ] ) )
752
                  in
753
                  requeue_query () )
×
UNCOV
754
          | ( Query.What_child_hashes (address, requested_depth)
×
755
            , Answer.Child_hashes_are hashes ) -> (
756
              match add_subtree t address hashes requested_depth with
757
              | `Hash_mismatch (expected, actual) ->
×
758
                  let%map () =
759
                    record_envelope_sender t.trust_system logger sender
×
760
                      ( Actions.Sent_bad_hash
761
                      , Some
762
                          ( "hashes sent for subtree on address $address merge \
763
                             to $actual_merge but we expected $expected_merge"
764
                          , [ ("actual_merge", Hash.to_yojson actual)
×
765
                            ; ("expected_merge", Hash.to_yojson expected)
×
766
                            ] ) )
767
                  in
768
                  requeue_query ()
×
769
              | `Invalid_length ->
×
770
                  let%map () =
771
                    record_envelope_sender t.trust_system logger sender
×
772
                      ( Actions.Sent_bad_hash
773
                      , Some
774
                          ( "hashes sent for subtree on address $address must \
775
                             be a power of 2 in the range 2-2^$depth"
776
                          , [ ( "depth"
777
                              , `Int ledger_sync_config.max_subtree_depth )
778
                            ] ) )
779
                  in
780
                  requeue_query ()
×
UNCOV
781
              | `Good children_to_verify ->
×
782
                  Array.iter children_to_verify ~f:(fun (addr, hash) ->
UNCOV
783
                      handle_node t addr hash ) ;
×
UNCOV
784
                  credit_fulfilled_request () )
×
785
          | query, answer ->
×
786
              let%map () =
787
                record_envelope_sender t.trust_system logger sender
×
788
                  ( Actions.Violated_protocol
789
                  , Some
790
                      ( "Answered question we didn't ask! Query was $query \
791
                         answer was $answer"
792
                      , [ ("query", Query.to_yojson Addr.to_yojson query)
×
793
                        ; ( "answer"
794
                          , Answer.to_yojson Hash.to_yojson Account.to_yojson
×
795
                              answer )
796
                        ] ) )
797
              in
798
              requeue_query ()
×
799
        in
UNCOV
800
        if
×
801
          Root_hash.equal
UNCOV
802
            (Option.value_exn t.desired_root)
×
UNCOV
803
            (MT.merkle_root t.tree)
×
UNCOV
804
        then (
×
UNCOV
805
          [%str_log trace] Snarked_ledger_synced ;
×
UNCOV
806
          all_done t ) ;
×
UNCOV
807
        Deferred.unit
×
808
    in
809
    Linear_pipe.iter t.answers ~f:handle_answer
810

811
  let new_goal t h ~data ~equal =
UNCOV
812
    let open (val t.context) in
×
813
    let should_skip =
814
      match t.desired_root with
UNCOV
815
      | None ->
×
816
          false
UNCOV
817
      | Some h' ->
×
UNCOV
818
          Root_hash.equal h h'
×
819
    in
UNCOV
820
    if not should_skip then (
×
821
      Option.iter t.desired_root ~f:(fun root_hash ->
UNCOV
822
          [%log debug]
×
823
            ~metadata:
UNCOV
824
              [ ("old_root_hash", Root_hash.to_yojson root_hash)
×
UNCOV
825
              ; ("new_root_hash", Root_hash.to_yojson h)
×
826
              ]
827
            "New_goal: changing target from $old_root_hash to $new_root_hash" ) ;
UNCOV
828
      Ivar.fill_if_empty t.validity_listener
×
829
        (`Target_changed (t.desired_root, h)) ;
UNCOV
830
      t.validity_listener <- Ivar.create () ;
×
831
      t.desired_root <- Some h ;
832
      t.auxiliary_data <- Some data ;
833
      Linear_pipe.write_without_pushback_if_open t.queries (h, Num_accounts) ;
UNCOV
834
      `New )
×
835
    else if
×
836
      Option.fold t.auxiliary_data ~init:false ~f:(fun _ saved_data ->
837
          equal data saved_data )
×
838
    then (
×
839
      [%log debug] "New_goal to same hash, not doing anything" ;
×
840
      `Repeat )
×
841
    else (
×
842
      t.auxiliary_data <- Some data ;
843
      `Update_data )
844

845
  let rec valid_tree t =
UNCOV
846
    match%bind Ivar.read t.validity_listener with
×
UNCOV
847
    | `Ok ->
×
UNCOV
848
        return (t.tree, Option.value_exn t.auxiliary_data)
×
UNCOV
849
    | `Target_changed _ ->
×
850
        valid_tree t
851

852
  let peek_valid_tree t =
UNCOV
853
    Option.bind (Ivar.peek t.validity_listener) ~f:(function
×
854
      | `Ok ->
×
855
          Some t.tree
856
      | `Target_changed _ ->
×
857
          None )
858

859
  let wait_until_valid t h =
UNCOV
860
    if not (Root_hash.equal h (desired_root_exn t)) then
×
861
      return (`Target_changed (t.desired_root, h))
×
862
    else
UNCOV
863
      Deferred.map (Ivar.read t.validity_listener) ~f:(function
×
UNCOV
864
        | `Target_changed payload ->
×
865
            `Target_changed payload
UNCOV
866
        | `Ok ->
×
867
            `Ok t.tree )
868

869
  let fetch t rh ~data ~equal =
UNCOV
870
    ignore (new_goal t rh ~data ~equal : [ `New | `Repeat | `Update_data ]) ;
×
871
    wait_until_valid t rh
872

873
  let create mt ~context ~trust_system =
UNCOV
874
    let qr, qw = Linear_pipe.create () in
×
UNCOV
875
    let ar, aw = Linear_pipe.create () in
×
UNCOV
876
    let t =
×
877
      { desired_root = None
878
      ; auxiliary_data = None
879
      ; tree = mt
880
      ; trust_system
881
      ; answers = ar
882
      ; answer_writer = aw
883
      ; queries = qw
884
      ; query_reader = qr
UNCOV
885
      ; waiting_parents = Addr.Table.create ()
×
UNCOV
886
      ; waiting_content = Addr.Table.create ()
×
UNCOV
887
      ; validity_listener = Ivar.create ()
×
888
      ; context
889
      }
890
    in
UNCOV
891
    don't_wait_for (main_loop t) ;
×
UNCOV
892
    t
×
893

894
  let apply_or_queue_diff _ _ =
895
    (* Need some interface for the diffs, not sure the layering is right here. *)
896
    failwith "todo"
×
897

898
  let merkle_path_at_addr _ = failwith "no"
×
899

900
  let get_account_at_addr _ = failwith "no"
×
901
end
162✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc