• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MinaProtocol / mina / 358

13 Jun 2025 03:53PM UTC coverage: 31.724% (-15.4%) from 47.137%
358

push

buildkite

web-flow
Merge pull request #17410 from MinaProtocol/georgeee/synchronize-develop-compatible

Synchronize `develop` and `compatible` branches

33 of 352 new or added lines in 20 files covered. (9.38%)

18725 existing lines in 383 files now uncovered.

22485 of 70878 relevant lines covered (31.72%)

27782.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.44
/src/lib/syncable_ledger/syncable_ledger.ml
1
open Core_kernel
22✔
2
open Async_kernel
3
open Pipe_lib
4
open Network_peer
5

6
type Structured_log_events.t += Snarked_ledger_synced
UNCOV
7
  [@@deriving register_event { msg = "Snarked database sync'd. All done" }]
×
8

9
(** Run f recursively n times, starting with value r.
10
    e.g. funpow 3 f r = f (f (f r)) *)
UNCOV
11
let rec funpow n f r = if n > 0 then funpow (n - 1) f (f r) else r
×
12

13
module Query = struct
14
  [%%versioned
15
  module Stable = struct
16
    module V2 = struct
17
      type 'addr t =
22✔
18
        | What_child_hashes of 'addr * int
×
19
            (** What are the hashes of the children of this address? 
20
            If depth > 1 then we get the leaves of a subtree rooted
21
            at address and of the given depth. 
22
            For depth = 1 we have the simplest case with just the 2
23
            direct children.
24
            *)
UNCOV
25
        | What_contents of 'addr
×
26
            (** What accounts are at this address? addr must have depth
27
            tree_depth - account_subtree_height *)
UNCOV
28
        | Num_accounts
×
29
            (** How many accounts are there? Used to size data structure and
30
            figure out what part of the tree is filled in. *)
31
      [@@deriving sexp, yojson, hash, compare]
66✔
32
    end
33

34
    module V1 = struct
35
      type 'addr t =
22✔
36
        | What_child_hashes of 'addr
×
37
            (** What are the hashes of the children of this address? *)
38
        | What_contents of 'addr
×
39
            (** What accounts are at this address? addr must have depth
40
            tree_depth - account_subtree_height *)
41
        | Num_accounts
×
42
            (** How many accounts are there? Used to size data structure and
43
            figure out what part of the tree is filled in. *)
44
      [@@deriving sexp, yojson, hash, compare]
66✔
45

46
      let to_latest : 'a t -> 'a V2.t = function
47
        | What_child_hashes a ->
×
48
            What_child_hashes (a, 1)
49
        | What_contents a ->
×
50
            What_contents a
51
        | Num_accounts ->
×
52
            Num_accounts
53
    end
54
  end]
55
end
56

57
module Answer = struct
58
  [%%versioned
59
  module Stable = struct
60
    module V2 = struct
61
      type ('hash, 'account) t =
22✔
62
        | Child_hashes_are of 'hash Bounded_types.ArrayN4000.Stable.V1.t
×
63
            (** The requested addresses' children have these hashes.
64
            May be any power of 2 number of children, and not necessarily 
65
            immediate children  *)
66
        | Contents_are of 'account list
×
67
            (** The requested address has these accounts *)
68
        | Num_accounts of int * 'hash
×
69
            (** There are this many accounts and the smallest subtree that
70
                contains all non-empty nodes has this hash. *)
71
      [@@deriving sexp, yojson]
66✔
72
    end
73

74
    module V1 = struct
75
      type ('hash, 'account) t =
22✔
76
        | Child_hashes_are of 'hash * 'hash
×
77
            (** The requested address's children have these hashes **)
78
        | Contents_are of 'account list
×
79
            (** The requested address has these accounts *)
80
        | Num_accounts of int * 'hash
×
81
            (** There are this many accounts and the smallest subtree that
82
                contains all non-empty nodes has this hash. *)
83
      [@@deriving sexp, yojson]
66✔
84

85
      let to_latest acct_to_latest = function
86
        | Child_hashes_are (h1, h2) ->
×
87
            V2.Child_hashes_are [| h1; h2 |]
88
        | Contents_are accts ->
×
89
            V2.Contents_are (List.map ~f:acct_to_latest accts)
×
90
        | Num_accounts (i, h) ->
×
91
            V2.Num_accounts (i, h)
92

93
      (* Not a standard versioning function *)
94

95
      (** Attempts to downgrade v2 -> v1 *)
96
      let from_v2 : ('a, 'b) V2.t -> ('a, 'b) t Or_error.t = function
97
        | Child_hashes_are h ->
×
98
            if Array.length h = 2 then Ok (Child_hashes_are (h.(0), h.(1)))
×
99
            else Or_error.error_string "can't downgrade wide query"
×
100
        | Contents_are accs ->
×
101
            Ok (Contents_are accs)
102
        | Num_accounts (n, h) ->
×
103
            Ok (Num_accounts (n, h))
104
    end
105
  end]
106
end
107

108
type daemon_config = { max_subtree_depth : int; default_subtree_depth : int }
109

110
let create_config ~(compile_config : Mina_compile_config.t) ~max_subtree_depth
111
    ~default_subtree_depth () =
UNCOV
112
  { max_subtree_depth =
×
UNCOV
113
      Option.value ~default:compile_config.sync_ledger_max_subtree_depth
×
114
        max_subtree_depth
115
  ; default_subtree_depth =
UNCOV
116
      Option.value ~default:compile_config.sync_ledger_default_subtree_depth
×
117
        default_subtree_depth
118
  }
119

120
module type CONTEXT = sig
121
  val logger : Logger.t
122

123
  val ledger_sync_config : daemon_config
124
end
125

126
module type Inputs_intf = sig
127
  module Addr : module type of Merkle_address
128

129
  module Account : sig
130
    type t [@@deriving bin_io, sexp, yojson]
131
  end
132

133
  module Hash : Merkle_ledger.Intf.Hash with type account := Account.t
134

135
  module Root_hash : sig
136
    type t [@@deriving equal, sexp, yojson]
137

138
    val to_hash : t -> Hash.t
139
  end
140

141
  module MT :
142
    Merkle_ledger.Intf.SYNCABLE
143
      with type hash := Hash.t
144
       and type root_hash := Root_hash.t
145
       and type addr := Addr.t
146
       and type account := Account.t
147

148
  val account_subtree_height : int
149
end
150

151
module type S = sig
152
  type 'a t [@@deriving sexp]
153

154
  type merkle_tree
155

156
  type merkle_path
157

158
  type hash
159

160
  type root_hash
161

162
  type addr
163

164
  type diff
165

166
  type account
167

168
  type index = int
169

170
  type query
171

172
  type answer
173

174
  module Responder : sig
175
    type t
176

177
    val create :
178
         merkle_tree
179
      -> (query -> unit)
180
      -> context:(module CONTEXT)
181
      -> trust_system:Trust_system.t
182
      -> t
183

184
    val answer_query :
185
      t -> query Envelope.Incoming.t -> answer Or_error.t Deferred.t
186
  end
187

188
  val create :
189
       merkle_tree
190
    -> context:(module CONTEXT)
191
    -> trust_system:Trust_system.t
192
    -> 'a t
193

194
  val answer_writer :
195
       'a t
196
    -> (root_hash * query * answer Envelope.Incoming.t) Linear_pipe.Writer.t
197

198
  val query_reader : 'a t -> (root_hash * query) Linear_pipe.Reader.t
199

200
  val destroy : 'a t -> unit
201

202
  val new_goal :
203
       'a t
204
    -> root_hash
205
    -> data:'a
206
    -> equal:('a -> 'a -> bool)
207
    -> [ `Repeat | `New | `Update_data ]
208

209
  val peek_valid_tree : 'a t -> merkle_tree option
210

211
  val valid_tree : 'a t -> (merkle_tree * 'a) Deferred.t
212

213
  val wait_until_valid :
214
       'a t
215
    -> root_hash
216
    -> [ `Ok of merkle_tree | `Target_changed of root_hash option * root_hash ]
217
       Deferred.t
218

219
  val fetch :
220
       'a t
221
    -> root_hash
222
    -> data:'a
223
    -> equal:('a -> 'a -> bool)
224
    -> [ `Ok of merkle_tree | `Target_changed of root_hash option * root_hash ]
225
       Deferred.t
226

227
  val apply_or_queue_diff : 'a t -> diff -> unit
228

229
  val merkle_path_at_addr : 'a t -> addr -> merkle_path Or_error.t
230

231
  val get_account_at_addr : 'a t -> addr -> account Or_error.t
232
end
233

234
(*
235

236
Every node of the merkle tree is always in one of three states:
237

238
- Fresh.
239
  The current contents for this node in the MT match what we
240
  expect.
241
- Stale
242
  The current contents for this node in the MT do _not_ match
243
  what we expect.
244
- Unknown.
245
  We don't know what to expect yet.
246

247

248
Although every node conceptually has one of these states, and can
249
make a transition at any time, the syncer operates only along a
250
"frontier" of the tree, which consists of the deepest Stale nodes.
251

252
The goal of the ledger syncer is to make the root node be fresh,
253
starting from it being stale.
254

255
The syncer usually operates exclusively on these frontier nodes
256
and their direct children. However, the goal hash can change
257
while the syncer is running, and at that point every non-root node
258
conceptually becomes Unknown, and we need to restart. However, we
259
don't need to restart completely: in practice, only small portions
260
of the merkle tree change between goals, and we can re-use the "Stale"
261
nodes we already have if the expected hash doesn't change.
262

263
*)
264
(*
265
Note: while syncing, the underlying ledger is in an
266
indeterminate state. We're mutating hashes at internal
267
nodes without updating their children. In fact, we
268
don't even set all the hashes for the internal nodes!
269
(When we hit a height=N subtree, we don't do anything
270
with the hashes in the bottomost N-1 internal nodes).
271
*)
272

273
module Make (Inputs : Inputs_intf) : sig
274
  open Inputs
275

276
  include
277
    S
278
      with type merkle_tree := MT.t
279
       and type hash := Hash.t
280
       and type root_hash := Root_hash.t
281
       and type addr := Addr.t
282
       and type merkle_path := MT.path
283
       and type account := Account.t
284
       and type query := Addr.t Query.t
285
       and type answer := (Hash.t, Account.t) Answer.t
286
end = struct
287
  open Inputs
288

289
  type diff = unit
290

291
  type index = int
292

293
  type answer = (Hash.t, Account.t) Answer.t
294

295
  type query = Addr.t Query.t
296

297
  (* Provides addresses at an specific depth from this address *)
298
  let intermediate_range ledger_depth addr i =
UNCOV
299
    Array.init (1 lsl i) ~f:(fun idx ->
×
UNCOV
300
        Addr.extend_exn ~ledger_depth addr ~num_bits:i (Int64.of_int idx) )
×
301

302
  module Responder = struct
303
    type t =
304
      { mt : MT.t
305
      ; f : query -> unit
306
      ; context : (module CONTEXT)
307
      ; trust_system : Trust_system.t
308
      }
309

310
    let create :
311
           MT.t
312
        -> (query -> unit)
313
        -> context:(module CONTEXT)
314
        -> trust_system:Trust_system.t
315
        -> t =
UNCOV
316
     fun mt f ~context ~trust_system -> { mt; f; context; trust_system }
×
317

318
    let answer_query :
319
        t -> query Envelope.Incoming.t -> answer Or_error.t Deferred.t =
320
     fun { mt; f; context; trust_system } query_envelope ->
UNCOV
321
      let open (val context) in
×
322
      let open Trust_system in
323
      let ledger_depth = MT.depth mt in
UNCOV
324
      let sender = Envelope.Incoming.sender query_envelope in
×
UNCOV
325
      let query = Envelope.Incoming.data query_envelope in
×
UNCOV
326
      f query ;
×
UNCOV
327
      let response_or_punish =
×
328
        match query with
UNCOV
329
        | What_contents a ->
×
UNCOV
330
            if Addr.height ~ledger_depth a > account_subtree_height then
×
331
              Either.Second
×
332
                ( Actions.Violated_protocol
333
                , Some
334
                    ( "Requested too big of a subtree at once"
335
                    , [ ("addr", Addr.to_yojson a) ] ) )
×
336
            else
UNCOV
337
              let addresses_and_accounts =
×
338
                List.sort ~compare:(fun (addr1, _) (addr2, _) ->
UNCOV
339
                    Addr.compare addr1 addr2 )
×
UNCOV
340
                @@ MT.get_all_accounts_rooted_at_exn mt a
×
341
                (* can't actually throw *)
342
              in
UNCOV
343
              let addresses, accounts = List.unzip addresses_and_accounts in
×
UNCOV
344
              if List.is_empty addresses then
×
345
                (* Peer should know what portions of the tree are full from the
346
                   Num_accounts query. *)
347
                Either.Second
×
348
                  ( Actions.Violated_protocol
349
                  , Some
350
                      ("Requested empty subtree", [ ("addr", Addr.to_yojson a) ])
×
351
                  )
352
              else
UNCOV
353
                let first_address, rest_address =
×
UNCOV
354
                  (List.hd_exn addresses, List.tl_exn addresses)
×
355
                in
356
                let missing_address, is_compact =
357
                  List.fold rest_address
UNCOV
358
                    ~init:(Addr.next first_address, true)
×
359
                    ~f:(fun (expected_address, is_compact) actual_address ->
UNCOV
360
                      if
×
361
                        is_compact
UNCOV
362
                        && [%equal: Addr.t option] expected_address
×
363
                             (Some actual_address)
UNCOV
364
                      then (Addr.next actual_address, true)
×
365
                      else (expected_address, false) )
×
366
                in
367
                if not is_compact then (
×
368
                  (* indicates our ledger is invalid somehow. *)
369
                  [%log fatal]
×
370
                    ~metadata:
371
                      [ ( "missing_address"
372
                        , Addr.to_yojson (Option.value_exn missing_address) )
×
373
                      ; ( "addresses_and_accounts"
374
                        , `List
375
                            (List.map addresses_and_accounts
×
376
                               ~f:(fun (addr, account) ->
377
                                 `Tuple
×
378
                                   [ Addr.to_yojson addr
×
379
                                   ; Account.to_yojson account
×
380
                                   ] ) ) )
381
                      ]
382
                    "Missing an account at address: $missing_address inside \
383
                     the list: $addresses_and_accounts" ;
384
                  assert false )
×
UNCOV
385
                else Either.First (Answer.Contents_are accounts)
×
UNCOV
386
        | Num_accounts ->
×
387
            let len = MT.num_accounts mt in
UNCOV
388
            let height = Int.ceil_log2 len in
×
389
            (* FIXME: bug when height=0 https://github.com/o1-labs/nanobit/issues/365 *)
UNCOV
390
            let content_root_addr =
×
391
              funpow
UNCOV
392
                (MT.depth mt - height)
×
393
                (fun a ->
UNCOV
394
                  Addr.child_exn ~ledger_depth a Mina_stdlib.Direction.Left )
×
UNCOV
395
                (Addr.root ())
×
396
            in
UNCOV
397
            Either.First
×
398
              (Num_accounts
UNCOV
399
                 (len, MT.get_inner_hash_at_addr_exn mt content_root_addr) )
×
UNCOV
400
        | What_child_hashes (a, subtree_depth) -> (
×
401
            match subtree_depth with
UNCOV
402
            | n when n >= 1 -> (
×
403
                let subtree_depth =
404
                  min n ledger_sync_config.max_subtree_depth
405
                in
UNCOV
406
                let ledger_depth = MT.depth mt in
×
UNCOV
407
                let addresses =
×
408
                  intermediate_range ledger_depth a subtree_depth
409
                in
UNCOV
410
                match
×
411
                  Or_error.try_with (fun () ->
UNCOV
412
                      let get_hash a = MT.get_inner_hash_at_addr_exn mt a in
×
413
                      let hashes = Array.map addresses ~f:get_hash in
UNCOV
414
                      Answer.Child_hashes_are hashes )
×
415
                with
UNCOV
416
                | Ok answer ->
×
417
                    Either.First answer
418
                | Error e ->
×
419
                    [%log error]
×
420
                      ~metadata:[ ("error", Error_json.error_to_yojson e) ]
×
421
                      "When handling What_child_hashes request, the following \
422
                       error happended: $error" ;
423
                    Either.Second
×
424
                      ( Actions.Violated_protocol
425
                      , Some
426
                          ( "Invalid address in What_child_hashes request"
427
                          , [ ("addr", Addr.to_yojson a) ] ) ) )
×
UNCOV
428
            | _ ->
×
UNCOV
429
                [%log error]
×
430
                  "When handling What_child_hashes request, the depth was \
431
                   outside the valid range" ;
UNCOV
432
                Either.Second
×
433
                  ( Actions.Violated_protocol
434
                  , Some
435
                      ( "Invalid depth requested in What_child_hashes request"
UNCOV
436
                      , [ ("addr", Addr.to_yojson a) ] ) ) )
×
437
      in
438

439
      match response_or_punish with
UNCOV
440
      | Either.First answer ->
×
441
          Deferred.return @@ Ok answer
UNCOV
442
      | Either.Second action ->
×
443
          let%map _ =
UNCOV
444
            record_envelope_sender trust_system logger sender action
×
445
          in
UNCOV
446
          let err =
×
UNCOV
447
            Option.value_map ~default:"Violated protocol" (snd action) ~f:fst
×
448
          in
UNCOV
449
          Or_error.error_string err
×
450
  end
451

452
  type 'a t =
453
    { mutable desired_root : Root_hash.t option
454
    ; mutable auxiliary_data : 'a option
455
    ; tree : MT.t
456
    ; trust_system : Trust_system.t
457
    ; answers :
458
        (Root_hash.t * query * answer Envelope.Incoming.t) Linear_pipe.Reader.t
459
    ; answer_writer :
460
        (Root_hash.t * query * answer Envelope.Incoming.t) Linear_pipe.Writer.t
461
    ; queries : (Root_hash.t * query) Linear_pipe.Writer.t
462
    ; query_reader : (Root_hash.t * query) Linear_pipe.Reader.t
463
    ; waiting_parents : Hash.t Addr.Table.t
464
          (** Addresses we are waiting for the children of, and the expected
465
              hash of the node with the address. *)
466
    ; waiting_content : Hash.t Addr.Table.t
467
    ; mutable validity_listener :
468
        [ `Ok | `Target_changed of Root_hash.t option * Root_hash.t ] Ivar.t
469
    ; context : (module CONTEXT)
470
    }
471

472
  let t_of_sexp _ = failwith "t_of_sexp: not implemented"
×
473

474
  let sexp_of_t _ = failwith "sexp_of_t: not implemented"
×
475

UNCOV
476
  let desired_root_exn { desired_root; _ } = desired_root |> Option.value_exn
×
477

478
  let destroy t =
UNCOV
479
    Linear_pipe.close_read t.answers ;
×
UNCOV
480
    Linear_pipe.close_read t.query_reader
×
481

UNCOV
482
  let answer_writer t = t.answer_writer
×
483

UNCOV
484
  let query_reader t = t.query_reader
×
485

486
  let expect_children : 'a t -> Addr.t -> Hash.t -> unit =
487
   fun t parent_addr expected ->
UNCOV
488
    let open (val t.context) in
×
UNCOV
489
    [%log trace]
×
490
      ~metadata:
UNCOV
491
        [ ("parent_address", Addr.to_yojson parent_addr)
×
UNCOV
492
        ; ("hash", Hash.to_yojson expected)
×
493
        ]
494
      "Expecting children parent $parent_address, expected: $hash" ;
UNCOV
495
    Addr.Table.add_exn t.waiting_parents ~key:parent_addr ~data:expected
×
496

497
  let expect_content : 'a t -> Addr.t -> Hash.t -> unit =
498
   fun t addr expected ->
UNCOV
499
    let open (val t.context) in
×
UNCOV
500
    [%log trace]
×
501
      ~metadata:
UNCOV
502
        [ ("address", Addr.to_yojson addr); ("hash", Hash.to_yojson expected) ]
×
503
      "Expecting content addr $address, expected: $hash" ;
UNCOV
504
    Addr.Table.add_exn t.waiting_content ~key:addr ~data:expected
×
505

506
  (** Given an address and the accounts below that address, fill in the tree
507
      with them. *)
508
  let add_content :
509
         'a t
510
      -> Addr.t
511
      -> Account.t list
512
      -> [ `Success
513
         | `Hash_mismatch of Hash.t * Hash.t  (** expected hash, actual *) ] =
514
   fun t addr content ->
UNCOV
515
    let open (val t.context) in
×
516
    let expected = Addr.Table.find_exn t.waiting_content addr in
517
    (* TODO #444 should we batch all the updates and do them at the end? *)
518
    (* We might write the wrong data to the underlying ledger here, but if so
519
       we'll requeue the address and it'll be overwritten. *)
UNCOV
520
    MT.set_all_accounts_rooted_at_exn t.tree addr content ;
×
UNCOV
521
    Addr.Table.remove t.waiting_content addr ;
×
UNCOV
522
    [%log trace]
×
523
      ~metadata:
UNCOV
524
        [ ("address", Addr.to_yojson addr); ("hash", Hash.to_yojson expected) ]
×
525
      "Found content addr $address, with hash $hash, removing from waiting \
526
       content" ;
UNCOV
527
    let actual = MT.get_inner_hash_at_addr_exn t.tree addr in
×
UNCOV
528
    if Hash.equal actual expected then `Success
×
529
    else `Hash_mismatch (expected, actual)
×
530

531
  (* Merges each 2 contigous nodes, halving the size of the array *)
532
  let merge_siblings : Hash.t array -> index -> Hash.t array =
533
   fun nodes height ->
UNCOV
534
    let len = Array.length nodes in
×
535
    if len mod 2 <> 0 then failwith "length must be even" ;
×
UNCOV
536
    let half_len = len / 2 in
×
UNCOV
537
    let f i = Hash.merge ~height nodes.(2 * i) nodes.((2 * i) + 1) in
×
538
    Array.init half_len ~f
539

540
  (* Assumes nodes to be a power of 2 and merges them into their common root *)
541
  let rec merge_many : Hash.t array -> index -> Hash.t =
542
   fun nodes height ->
UNCOV
543
    let len = Array.length nodes in
×
UNCOV
544
    match len with
×
UNCOV
545
    | 1 ->
×
546
        nodes.(0)
UNCOV
547
    | _ ->
×
548
        let half = merge_siblings nodes height in
UNCOV
549
        merge_many half (height + 1)
×
550

551
  let merge_many : Hash.t array -> index -> index -> Hash.t =
552
   fun nodes height subtree_depth ->
UNCOV
553
    let bottom_height = height - subtree_depth in
×
554
    let hash = merge_many nodes bottom_height in
UNCOV
555
    hash
×
556

557
  (* Adds the subtree given as the 2^k subtree leaves with the given prefix address *)
558
  (* Returns next nodes to be checked *)
559
  let add_subtree :
560
         'a t
561
      -> Addr.t
562
      -> Hash.t array
563
      -> int
564
      -> [ `Good of (Addr.t * Hash.t) array
565
         | `Hash_mismatch of Hash.t * Hash.t
566
         | `Invalid_length ] =
567
   fun t addr nodes requested_depth ->
UNCOV
568
    let open (val t.context) in
×
569
    let len = Array.length nodes in
UNCOV
570
    let is_power = Int.is_pow2 len in
×
UNCOV
571
    let is_more_than_two = len >= 2 in
×
572
    let subtree_depth = Int.ceil_log2 len in
UNCOV
573
    let less_than_requested = subtree_depth <= requested_depth in
×
UNCOV
574
    let valid_length = is_power && is_more_than_two && less_than_requested in
×
575
    if valid_length then
UNCOV
576
      let ledger_depth = MT.depth t.tree in
×
UNCOV
577
      let expected =
×
578
        Option.value_exn ~message:"Forgot to wait for a node"
UNCOV
579
          (Addr.Table.find t.waiting_parents addr)
×
580
      in
UNCOV
581
      let merged =
×
UNCOV
582
        merge_many nodes (ledger_depth - Addr.depth addr) subtree_depth
×
583
      in
UNCOV
584
      if Hash.equal expected merged then (
×
585
        Addr.Table.remove t.waiting_parents addr ;
UNCOV
586
        let addresses = intermediate_range ledger_depth addr subtree_depth in
×
UNCOV
587
        let addresses_and_hashes = Array.zip_exn addresses nodes in
×
588

589
        (* Filter to fetch only those that differ *)
UNCOV
590
        let should_fetch_children addr hash =
×
UNCOV
591
          not @@ Hash.equal (MT.get_inner_hash_at_addr_exn t.tree addr) hash
×
592
        in
593
        let subtrees_to_fetch =
594
          addresses_and_hashes
UNCOV
595
          |> Array.filter ~f:(Tuple2.uncurry should_fetch_children)
×
596
        in
UNCOV
597
        `Good subtrees_to_fetch )
×
598
      else `Hash_mismatch (expected, merged)
×
599
    else `Invalid_length
×
600

601
  let all_done t =
UNCOV
602
    let open (val t.context) in
×
UNCOV
603
    if not (Root_hash.equal (MT.merkle_root t.tree) (desired_root_exn t)) then
×
604
      failwith "We finished syncing, but made a mistake somewhere :("
×
UNCOV
605
    else (
×
606
      if Ivar.is_full t.validity_listener then
607
        [%log error] "Ivar.fill bug is here!" ;
×
UNCOV
608
      Ivar.fill t.validity_listener `Ok )
×
609

610
  (** Compute the hash of an empty tree of the specified height. *)
611
  let empty_hash_at_height h =
UNCOV
612
    let rec go prev ctr =
×
UNCOV
613
      if ctr = h then prev else go (Hash.merge ~height:ctr prev prev) (ctr + 1)
×
614
    in
615
    go Hash.empty_account 0
616

617
  (** Given the hash of the smallest subtree that contains all accounts, the
618
      height of that hash in the tree and the height of the whole tree, compute
619
      the hash of the whole tree. *)
620
  let complete_with_empties hash start_height result_height =
UNCOV
621
    let rec go cur_empty prev_hash height =
×
UNCOV
622
      if height = result_height then prev_hash
×
623
      else
UNCOV
624
        let cur = Hash.merge ~height prev_hash cur_empty in
×
UNCOV
625
        let next_empty = Hash.merge ~height cur_empty cur_empty in
×
UNCOV
626
        go next_empty cur (height + 1)
×
627
    in
UNCOV
628
    go (empty_hash_at_height start_height) hash start_height
×
629

630
  (** Given an address and the hash of the corresponding subtree, start getting
631
      the children.
632
  *)
633
  let handle_node t addr exp_hash =
UNCOV
634
    let open (val t.context) in
×
UNCOV
635
    if Addr.depth addr >= MT.depth t.tree - account_subtree_height then (
×
636
      expect_content t addr exp_hash ;
UNCOV
637
      Linear_pipe.write_without_pushback_if_open t.queries
×
UNCOV
638
        (desired_root_exn t, What_contents addr) )
×
UNCOV
639
    else (
×
640
      expect_children t addr exp_hash ;
UNCOV
641
      Linear_pipe.write_without_pushback_if_open t.queries
×
UNCOV
642
        ( desired_root_exn t
×
643
        , What_child_hashes (addr, ledger_sync_config.default_subtree_depth) ) )
644

645
  (** Handle the initial Num_accounts message, starting the main syncing
646
      process. *)
647
  let handle_num_accounts :
648
      'a t -> int -> Hash.t -> [ `Success | `Hash_mismatch of Hash.t * Hash.t ]
649
      =
650
   fun t n content_hash ->
UNCOV
651
    let rh = Root_hash.to_hash (desired_root_exn t) in
×
UNCOV
652
    let height = Int.ceil_log2 n in
×
653
    (* FIXME: bug when height=0 https://github.com/o1-labs/nanobit/issues/365 *)
UNCOV
654
    let actual = complete_with_empties content_hash height (MT.depth t.tree) in
×
UNCOV
655
    if Hash.equal actual rh then (
×
656
      Addr.Table.clear t.waiting_parents ;
657
      (* We should use this information to set the empty account slots empty and
658
         start syncing at the content root. See #1972. *)
UNCOV
659
      Addr.Table.clear t.waiting_content ;
×
UNCOV
660
      handle_node t (Addr.root ()) rh ;
×
UNCOV
661
      `Success )
×
662
    else `Hash_mismatch (rh, actual)
×
663

664
  let main_loop t =
UNCOV
665
    let open (val t.context) in
×
666
    let handle_answer :
667
           Root_hash.t
668
           * Addr.t Query.t
669
           * (Hash.t, Account.t) Answer.t Envelope.Incoming.t
670
        -> unit Deferred.t =
671
     fun (root_hash, query, env) ->
672
      (* NOTE: think about synchronization here. This is deferred now, so
673
         the t and the underlying ledger can change while processing is
674
         happening. *)
UNCOV
675
      let already_done =
×
676
        match Ivar.peek t.validity_listener with Some `Ok -> true | _ -> false
×
677
      in
678
      let sender = Envelope.Incoming.sender env in
UNCOV
679
      let answer = Envelope.Incoming.data env in
×
UNCOV
680
      [%log trace]
×
681
        ~metadata:
UNCOV
682
          [ ("root_hash", Root_hash.to_yojson root_hash)
×
UNCOV
683
          ; ("query", Query.to_yojson Addr.to_yojson query)
×
684
          ]
685
        "Handle answer for $root_hash" ;
686
      if not (Root_hash.equal root_hash (desired_root_exn t)) then (
×
687
        [%log trace]
×
688
          ~metadata:
689
            [ ("desired_hash", Root_hash.to_yojson (desired_root_exn t))
×
690
            ; ("ignored_hash", Root_hash.to_yojson root_hash)
×
691
            ]
692
          "My desired root was $desired_hash, so I'm ignoring $ignored_hash" ;
693
        Deferred.unit )
×
694
      else if already_done then (
×
695
        (* This can happen if we asked for hashes that turn out to be equal in
696
           underlying ledger and the target. *)
697
        [%log debug] "Got sync response when we're already finished syncing" ;
×
698
        Deferred.unit )
×
699
      else
UNCOV
700
        let open Trust_system in
×
701
        (* If a peer misbehaves we still need the information we asked them for,
702
           so requeue in that case. *)
703
        let requeue_query () =
704
          Linear_pipe.write_without_pushback_if_open t.queries (root_hash, query)
×
705
        in
706
        let credit_fulfilled_request () =
UNCOV
707
          record_envelope_sender t.trust_system logger sender
×
708
            ( Actions.Fulfilled_request
709
            , Some
710
                ( "sync ledger query $query"
UNCOV
711
                , [ ("query", Query.to_yojson Addr.to_yojson query) ] ) )
×
712
        in
713
        let%bind _ =
714
          match (query, answer) with
UNCOV
715
          | Query.What_contents addr, Answer.Contents_are leaves -> (
×
716
              match add_content t addr leaves with
UNCOV
717
              | `Success ->
×
UNCOV
718
                  credit_fulfilled_request ()
×
719
              | `Hash_mismatch (expected, actual) ->
×
720
                  let%map () =
721
                    record_envelope_sender t.trust_system logger sender
×
722
                      ( Actions.Sent_bad_hash
723
                      , Some
724
                          ( "sent accounts $accounts for address $addr, they \
725
                             hash to $actual but we expected $expected"
726
                          , [ ( "accounts"
727
                              , `List (List.map ~f:Account.to_yojson leaves) )
×
728
                            ; ("addr", Addr.to_yojson addr)
×
729
                            ; ("actual", Hash.to_yojson actual)
×
730
                            ; ("expected", Hash.to_yojson expected)
×
731
                            ] ) )
732
                  in
733
                  requeue_query () )
×
UNCOV
734
          | Query.Num_accounts, Answer.Num_accounts (count, content_root) -> (
×
735
              match handle_num_accounts t count content_root with
UNCOV
736
              | `Success ->
×
UNCOV
737
                  credit_fulfilled_request ()
×
738
              | `Hash_mismatch (expected, actual) ->
×
739
                  let%map () =
740
                    record_envelope_sender t.trust_system logger sender
×
741
                      ( Actions.Sent_bad_hash
742
                      , Some
743
                          ( "Claimed num_accounts $count, content root hash \
744
                             $content_root_hash, that implies a root hash of \
745
                             $actual, we expected $expected"
746
                          , [ ("count", `Int count)
747
                            ; ("content_root_hash", Hash.to_yojson content_root)
×
748
                            ; ("actual", Hash.to_yojson actual)
×
749
                            ; ("expected", Hash.to_yojson expected)
×
750
                            ] ) )
751
                  in
752
                  requeue_query () )
×
UNCOV
753
          | ( Query.What_child_hashes (address, requested_depth)
×
754
            , Answer.Child_hashes_are hashes ) -> (
755
              match add_subtree t address hashes requested_depth with
756
              | `Hash_mismatch (expected, actual) ->
×
757
                  let%map () =
758
                    record_envelope_sender t.trust_system logger sender
×
759
                      ( Actions.Sent_bad_hash
760
                      , Some
761
                          ( "hashes sent for subtree on address $address merge \
762
                             to $actual_merge but we expected $expected_merge"
763
                          , [ ("actual_merge", Hash.to_yojson actual)
×
764
                            ; ("expected_merge", Hash.to_yojson expected)
×
765
                            ] ) )
766
                  in
767
                  requeue_query ()
×
768
              | `Invalid_length ->
×
769
                  let%map () =
770
                    record_envelope_sender t.trust_system logger sender
×
771
                      ( Actions.Sent_bad_hash
772
                      , Some
773
                          ( "hashes sent for subtree on address $address must \
774
                             be a power of 2 in the range 2-2^$depth"
775
                          , [ ( "depth"
776
                              , `Int ledger_sync_config.max_subtree_depth )
777
                            ] ) )
778
                  in
779
                  requeue_query ()
×
UNCOV
780
              | `Good children_to_verify ->
×
781
                  Array.iter children_to_verify ~f:(fun (addr, hash) ->
UNCOV
782
                      handle_node t addr hash ) ;
×
UNCOV
783
                  credit_fulfilled_request () )
×
784
          | query, answer ->
×
785
              let%map () =
786
                record_envelope_sender t.trust_system logger sender
×
787
                  ( Actions.Violated_protocol
788
                  , Some
789
                      ( "Answered question we didn't ask! Query was $query \
790
                         answer was $answer"
791
                      , [ ("query", Query.to_yojson Addr.to_yojson query)
×
792
                        ; ( "answer"
793
                          , Answer.to_yojson Hash.to_yojson Account.to_yojson
×
794
                              answer )
795
                        ] ) )
796
              in
797
              requeue_query ()
×
798
        in
UNCOV
799
        if
×
800
          Root_hash.equal
UNCOV
801
            (Option.value_exn t.desired_root)
×
UNCOV
802
            (MT.merkle_root t.tree)
×
UNCOV
803
        then (
×
UNCOV
804
          [%str_log trace] Snarked_ledger_synced ;
×
UNCOV
805
          all_done t ) ;
×
UNCOV
806
        Deferred.unit
×
807
    in
808
    Linear_pipe.iter t.answers ~f:handle_answer
809

810
  let new_goal t h ~data ~equal =
UNCOV
811
    let open (val t.context) in
×
812
    let should_skip =
813
      match t.desired_root with
UNCOV
814
      | None ->
×
815
          false
UNCOV
816
      | Some h' ->
×
UNCOV
817
          Root_hash.equal h h'
×
818
    in
UNCOV
819
    if not should_skip then (
×
820
      Option.iter t.desired_root ~f:(fun root_hash ->
UNCOV
821
          [%log debug]
×
822
            ~metadata:
UNCOV
823
              [ ("old_root_hash", Root_hash.to_yojson root_hash)
×
UNCOV
824
              ; ("new_root_hash", Root_hash.to_yojson h)
×
825
              ]
826
            "New_goal: changing target from $old_root_hash to $new_root_hash" ) ;
UNCOV
827
      Ivar.fill_if_empty t.validity_listener
×
828
        (`Target_changed (t.desired_root, h)) ;
UNCOV
829
      t.validity_listener <- Ivar.create () ;
×
830
      t.desired_root <- Some h ;
831
      t.auxiliary_data <- Some data ;
832
      Linear_pipe.write_without_pushback_if_open t.queries (h, Num_accounts) ;
UNCOV
833
      `New )
×
834
    else if
×
835
      Option.fold t.auxiliary_data ~init:false ~f:(fun _ saved_data ->
836
          equal data saved_data )
×
837
    then (
×
838
      [%log debug] "New_goal to same hash, not doing anything" ;
×
839
      `Repeat )
×
840
    else (
×
841
      t.auxiliary_data <- Some data ;
842
      `Update_data )
843

844
  let rec valid_tree t =
UNCOV
845
    match%bind Ivar.read t.validity_listener with
×
UNCOV
846
    | `Ok ->
×
UNCOV
847
        return (t.tree, Option.value_exn t.auxiliary_data)
×
UNCOV
848
    | `Target_changed _ ->
×
849
        valid_tree t
850

851
  let peek_valid_tree t =
UNCOV
852
    Option.bind (Ivar.peek t.validity_listener) ~f:(function
×
853
      | `Ok ->
×
854
          Some t.tree
855
      | `Target_changed _ ->
×
856
          None )
857

858
  let wait_until_valid t h =
UNCOV
859
    if not (Root_hash.equal h (desired_root_exn t)) then
×
860
      return (`Target_changed (t.desired_root, h))
×
861
    else
UNCOV
862
      Deferred.map (Ivar.read t.validity_listener) ~f:(function
×
UNCOV
863
        | `Target_changed payload ->
×
864
            `Target_changed payload
UNCOV
865
        | `Ok ->
×
866
            `Ok t.tree )
867

868
  let fetch t rh ~data ~equal =
UNCOV
869
    ignore (new_goal t rh ~data ~equal : [ `New | `Repeat | `Update_data ]) ;
×
870
    wait_until_valid t rh
871

872
  let create mt ~context ~trust_system =
UNCOV
873
    let qr, qw = Linear_pipe.create () in
×
UNCOV
874
    let ar, aw = Linear_pipe.create () in
×
UNCOV
875
    let t =
×
876
      { desired_root = None
877
      ; auxiliary_data = None
878
      ; tree = mt
879
      ; trust_system
880
      ; answers = ar
881
      ; answer_writer = aw
882
      ; queries = qw
883
      ; query_reader = qr
UNCOV
884
      ; waiting_parents = Addr.Table.create ()
×
UNCOV
885
      ; waiting_content = Addr.Table.create ()
×
UNCOV
886
      ; validity_listener = Ivar.create ()
×
887
      ; context
888
      }
889
    in
UNCOV
890
    don't_wait_for (main_loop t) ;
×
UNCOV
891
    t
×
892

893
  let apply_or_queue_diff _ _ =
894
    (* Need some interface for the diffs, not sure the layering is right here. *)
895
    failwith "todo"
×
896

897
  let merkle_path_at_addr _ = failwith "no"
×
898

899
  let get_account_at_addr _ = failwith "no"
×
900
end
44✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc