• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freeeve / rustychickpeas / 27909064898

21 Jun 2026 03:33PM UTC coverage: 89.881% (+0.3%) from 89.551%
27909064898

push

github

freeeve
chore(lint): rustfmt the reader crate (CI fmt gate)

Pure rustfmt reflow of committed reader code (function-signature wrapping,
multi-line arg lists) — left uncommitted in a prior session, which is why CI's
fmt --check (and now the pre-push hook) flagged it. No logic change.

3322 of 3696 relevant lines covered (89.88%)

2.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/rustychickpeas-python/src/graph_snapshot.rs
1
//! GraphSnapshot Python wrapper
2

3
use crate::direction::Direction;
4
use crate::node::Node;
5
use crate::relationship::Relationship;
6
use crate::utils::{py_to_property_value, value_id_to_pyobject};
7
use pyo3::ffi;
8
use pyo3::prelude::*;
9
use pyo3::IntoPyObjectExt;
10
use roaring::RoaringBitmap;
11
use rustychickpeas_core::bitmap::NodeSet;
12
use rustychickpeas_core::types::PropertyKey;
13
use rustychickpeas_core::{
14
    AggOp, ColumnDtype, GraphSnapshot as CoreGraphSnapshot, Label, RelationshipRef,
15
    RelationshipType, ValueId,
16
};
17
use std::os::raw::{c_char, c_int, c_void};
18
use std::sync::{Arc, Mutex, PoisonError};
19

20
/// Python wrapper for GraphSnapshot
21
#[pyclass(name = "GraphSnapshot")]
22
pub struct GraphSnapshot {
23
    pub(crate) snapshot: std::sync::Arc<CoreGraphSnapshot>,
24
}
25

26
/// Iterator over the node IDs of a GraphSnapshot
27
///
28
/// Node IDs in a finalized snapshot are dense in `0..n_nodes` (the same range
29
/// accepted by `GraphSnapshot.node()`), so iteration only needs the bounds.
30
#[pyclass(name = "NodeIdIter")]
31
pub struct NodeIdIter {
32
    current: u32,
33
    end: u32,
34
}
35

36
#[pymethods]
37
impl NodeIdIter {
38
    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
39
        slf
40
    }
41

42
    fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<u32> {
43
        if slf.current < slf.end {
44
            let id = slf.current;
45
            slf.current += 1;
46
            Some(id)
47
        } else {
48
            None
49
        }
50
    }
51
}
52

53
impl GraphSnapshot {
54
    /// Get string ID from string
55
    /// Uses the reverse index in Atoms for O(1) lookup
56
    fn get_string_id(&self, s: &str) -> Option<u32> {
57
        self.snapshot.atoms.get_id(s)
58
    }
59

60
    /// Get label from string
61
    fn label_from_str(&self, s: &str) -> Option<Label> {
62
        self.get_string_id(s).map(Label::new)
63
    }
64

65
    /// Get relationship type from string
66
    fn rel_type_from_str(&self, s: &str) -> Option<RelationshipType> {
67
        self.get_string_id(s).map(RelationshipType::new)
68
    }
69

70
    /// Get property key from string
71
    fn property_key_from_str(&self, s: &str) -> Option<PropertyKey> {
72
        self.get_string_id(s)
73
    }
74

75
    /// Convert a Python value to a core [`ValueId`], resolving a string to its
76
    /// interned atom id. Returns `Ok(None)` when a string value is not interned
77
    /// in this snapshot — no node can carry it, so predicate/lookup callers
78
    /// short-circuit to "no match" rather than erroring.
79
    fn py_value_to_id_opt(&self, value: &Bound<'_, PyAny>) -> PyResult<Option<ValueId>> {
80
        use rustychickpeas_core::PropertyValue;
81
        Ok(match py_to_property_value(value)? {
82
            PropertyValue::String(s) => self.get_string_id(&s).map(ValueId::Str),
83
            PropertyValue::Integer(i) => Some(ValueId::I64(i)),
84
            PropertyValue::Float(f) => Some(ValueId::from_f64(f)),
85
            PropertyValue::Boolean(b) => Some(ValueId::Bool(b)),
86
            PropertyValue::InternedString(_) => {
87
                return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
88
                    "InternedString not supported here",
89
                ));
90
            }
91
        })
92
    }
93
}
94

95
impl GraphSnapshot {
96
    /// Internal constructor (not exposed to Python)
97
    /// Takes ownership of a GraphSnapshot
98
    pub(crate) fn new(snapshot: CoreGraphSnapshot) -> Self {
99
        Self {
100
            snapshot: std::sync::Arc::new(snapshot),
101
        }
102
    }
103

104
    /// Internal constructor from Arc (for manager.get_graph_snapshot)
105
    pub(crate) fn from_arc(snapshot: std::sync::Arc<CoreGraphSnapshot>) -> Self {
106
        Self { snapshot }
107
    }
108
}
109

110
#[pymethods]
111
impl GraphSnapshot {
112
    fn __repr__(&self) -> String {
113
        let version = self
114
            .snapshot
115
            .version()
116
            .map(|v| v.to_string())
117
            .unwrap_or_else(|| "None".to_string());
118
        format!(
119
            "GraphSnapshot(nodes={}, rels={}, version={})",
120
            self.snapshot.n_nodes, self.snapshot.n_rels, version
121
        )
122
    }
123

124
    fn __len__(&self) -> usize {
125
        self.snapshot.n_nodes as usize
126
    }
127

128
    /// Iterate over all node IDs in the snapshot (0..n_nodes)
129
    ///
130
    /// Yields exactly the node IDs accepted by `node()`.
131
    fn __iter__(&self) -> NodeIdIter {
132
        NodeIdIter {
133
            current: 0,
134
            end: self.snapshot.n_nodes,
135
        }
136
    }
137

138
    /// Get number of nodes
139
    fn node_count(&self) -> u32 {
140
        self.snapshot.n_nodes
141
    }
142

143
    /// Get number of relationships
144
    fn relationship_count(&self) -> u64 {
145
        self.snapshot.n_rels
146
    }
147

148
    /// Get node labels
149
    fn node_labels(&self, node_id: u32) -> PyResult<Vec<String>> {
150
        // GraphSnapshot doesn't store labels per node directly
151
        // We need to iterate through label_index to find which labels contain this node
152
        let mut labels = Vec::new();
153
        for (label, node_set) in &self.snapshot.label_index {
154
            if node_set.contains(node_id) {
155
                if let Some(label_str) = self.snapshot.resolve_string(label.id()) {
156
                    labels.push(label_str.to_string());
157
                }
158
            }
159
        }
160
        Ok(labels)
161
    }
162

163
    /// Get nodes with a specific label
164
    fn nodes_with_label(&self, label: String) -> PyResult<Vec<u32>> {
165
        // Call Rust function - it returns None if label doesn't exist
166
        // We need to distinguish between "label doesn't exist" vs "label exists but no nodes"
167
        // Since nodes_with_label_id returns Option<&NodeSet>, None means label not in index
168
        let label_id = self.label_from_str(&label);
169
        if label_id.is_none() {
170
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
171
                "Label '{}' not found",
172
                label
173
            )));
174
        }
175

176
        // Label exists, now get nodes (will return Some even if empty)
177
        if let Some(node_set) = self.snapshot.nodes_with_label(&label) {
178
            Ok(node_set.iter().collect())
179
        } else {
180
            // This shouldn't happen if label exists, but handle it
181
            Ok(Vec::new())
182
        }
183
    }
184

185
    /// Get a Node object for the given node ID
186
    fn node(&self, node_id: u32) -> PyResult<Node> {
187
        if node_id >= self.snapshot.n_nodes {
188
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
189
                "Node ID {} out of range (max: {})",
190
                node_id,
191
                self.snapshot.n_nodes.saturating_sub(1)
192
            )));
193
        }
194
        Ok(Node {
195
            snapshot: self.snapshot.clone(),
196
            node_id,
197
        })
198
    }
199

200
    /// Get relationships (neighbors) of a node with optional type filtering
201
    ///
202
    /// # Arguments
203
    /// * `node_id` - The node ID
204
    /// * `direction` - Direction of relationships (Outgoing, Incoming, Both)
205
    /// * `rel_types` - Optional list of relationship types to filter by
206
    #[pyo3(signature = (node_id, direction, rel_types=None))]
207
    fn relationships(
208
        &self,
209
        node_id: u32,
210
        direction: Direction,
211
        rel_types: Option<Vec<String>>,
212
    ) -> PyResult<Vec<Relationship>> {
213
        use rustychickpeas_core::types::RelationshipType;
214

215
        if node_id >= self.snapshot.n_nodes {
216
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
217
                "Node ID {} out of range (max: {})",
218
                node_id,
219
                self.snapshot.n_nodes.saturating_sub(1)
220
            )));
221
        }
222

223
        // Convert string types to RelationshipType IDs via the public resolver.
224
        let rel_type_ids: Option<Vec<RelationshipType>> = rel_types.as_ref().and_then(|types| {
225
            let ids: Vec<RelationshipType> = types
226
                .iter()
227
                .filter_map(|s| self.snapshot.rel_type(s))
228
                .collect();
229
            if ids.is_empty() && !types.is_empty() {
230
                return None;
231
            }
232
            Some(ids)
233
        });
234

235
        let mut relationships = Vec::new();
236

237
        // Handle outgoing relationships
238
        if matches!(direction, Direction::Outgoing | Direction::Both) {
239
            let start = self.snapshot.out_offsets[node_id as usize] as usize;
240
            let end = self.snapshot.out_offsets[node_id as usize + 1] as usize;
241

242
            for (idx, (&_neighbor, &rel_type)) in self.snapshot.out_nbrs[start..end]
243
                .iter()
244
                .zip(self.snapshot.out_types[start..end].iter())
245
                .enumerate()
246
            {
247
                let rel_csr_index = start + idx;
248

249
                // Apply type filter if provided
250
                if let Some(ref type_ids) = rel_type_ids {
251
                    if !type_ids.contains(&rel_type) {
252
                        continue;
253
                    }
254
                } else if rel_types.as_ref().map(|t| !t.is_empty()).unwrap_or(false) {
255
                    // Filter was provided but no types found - skip
256
                    continue;
257
                }
258

259
                relationships.push(Relationship {
260
                    snapshot: self.snapshot.clone(),
261
                    rel_index: rel_csr_index as u32,
262
                    is_outgoing: true,
263
                });
264
            }
265
        }
266

267
        // Handle incoming relationships
268
        if matches!(direction, Direction::Incoming | Direction::Both) {
269
            let start = self.snapshot.in_offsets[node_id as usize] as usize;
270
            let end = self.snapshot.in_offsets[node_id as usize + 1] as usize;
271

272
            for (idx, (&_neighbor, &rel_type)) in self.snapshot.in_nbrs[start..end]
273
                .iter()
274
                .zip(self.snapshot.in_types[start..end].iter())
275
                .enumerate()
276
            {
277
                let rel_csr_index = start + idx;
278

279
                // Apply type filter if provided
280
                if let Some(ref type_ids) = rel_type_ids {
281
                    if !type_ids.contains(&rel_type) {
282
                        continue;
283
                    }
284
                } else if rel_types.as_ref().map(|t| !t.is_empty()).unwrap_or(false) {
285
                    // Filter was provided but no types found - skip
286
                    continue;
287
                }
288

289
                relationships.push(Relationship {
290
                    snapshot: self.snapshot.clone(),
291
                    rel_index: rel_csr_index as u32,
292
                    is_outgoing: false,
293
                });
294
            }
295
        }
296

297
        Ok(relationships)
298
    }
299

300
    /// Neighbor node IDs in `direction`, optionally restricted to `rel_types`
301
    /// (deduplicated, ascending, when types are given).
302
    #[pyo3(signature = (node_id, direction, rel_types=None))]
303
    fn neighbor_ids(
304
        &self,
305
        node_id: u32,
306
        direction: Direction,
307
        rel_types: Option<Vec<String>>,
308
    ) -> Vec<u32> {
309
        match rel_types {
310
            None => self.snapshot.neighbors(node_id, direction.into()).collect(),
311
            Some(types) => {
312
                let mut set = RoaringBitmap::new();
313
                for t in &types {
314
                    for n in self
315
                        .snapshot
316
                        .neighbors_by_type(node_id, direction.into(), t.as_str())
317
                    {
318
                        set.insert(n);
319
                    }
320
                }
321
                set.iter().collect()
322
            }
323
        }
324
    }
325

326
    /// Histogram of the neighbours reached from `sources` via `rel_type` rels in
327
    /// `direction`: for each source node, count how many of its `rel_type`
328
    /// neighbours land on each target. Returns a dict mapping target node id to
329
    /// count. The whole aggregation runs in Rust on a single call, so it is far
330
    /// faster than counting neighbours in a Python loop.
331
    fn neighbor_counts(
332
        &self,
333
        sources: Vec<u32>,
334
        direction: Direction,
335
        rel_type: &str,
336
    ) -> std::collections::HashMap<u32, usize> {
337
        // Core returns a hashbrown map; collect into a std map so PyO3 hands Python a dict.
338
        self.snapshot
339
            .neighbor_counts(sources, direction.into(), rel_type)
340
            .into_iter()
341
            .collect()
342
    }
343

344
    /// Get neighbors of a node as Node objects
345
    /// Returns a list of Node objects for neighbors in the specified direction
346
    #[pyo3(signature = (node_id, direction, rel_types=None))]
347
    fn neighbors(
348
        &self,
349
        node_id: u32,
350
        direction: Direction,
351
        rel_types: Option<Vec<String>>,
352
    ) -> PyResult<Vec<Node>> {
353
        // Get relationships (optionally type-filtered) and extract neighbor IDs.
354
        let rels = self.relationships(node_id, direction, rel_types)?;
355
        let mut neighbor_ids = Vec::new();
356

357
        for rel in rels {
358
            let neighbor_id = if rel.is_outgoing {
359
                // For outgoing relationships, the end node is in out_nbrs
360
                let idx = rel.rel_index as usize;
361
                if idx < self.snapshot.out_nbrs.len() {
362
                    self.snapshot.out_nbrs[idx]
363
                } else {
364
                    continue; // Skip invalid relationship
365
                }
366
            } else {
367
                // For incoming relationships, the start node (neighbor) is in in_nbrs
368
                let idx = rel.rel_index as usize;
369
                if idx < self.snapshot.in_nbrs.len() {
370
                    self.snapshot.in_nbrs[idx]
371
                } else {
372
                    continue; // Skip invalid relationship
373
                }
374
            };
375
            neighbor_ids.push(neighbor_id);
376
        }
377

378
        Ok(neighbor_ids
379
            .into_iter()
380
            .map(|id| Node {
381
                snapshot: self.snapshot.clone(),
382
                node_id: id,
383
            })
384
            .collect())
385
    }
386

387
    /// Degree of a node — O(1) from the CSR offsets when untyped; with
388
    /// `rel_type`, the count of neighbors reached via that type.
389
    #[pyo3(signature = (node_id, direction, rel_type=None))]
390
    fn degree(&self, node_id: u32, direction: Direction, rel_type: Option<&str>) -> usize {
391
        match rel_type {
392
            Some(rt) => self
393
                .snapshot
394
                .neighbors_by_type(node_id, direction.into(), rt)
395
                .count(),
396
            None => crate::utils::csr_degree(&self.snapshot, node_id, direction.into()),
397
        }
398
    }
399

400
    /// Whether `node_id` carries `label` — an O(1) label-index check, vs the
401
    /// `"X" in node_labels(n)` scan it replaces.
402
    fn has_label(&self, node_id: u32, label: &str) -> bool {
403
        self.snapshot.has_label(node_id, label)
404
    }
405

406
    /// Whether `node_id` has any neighbor via `rel_type` in `direction`
407
    /// (existence check, short-circuits on the first match).
408
    fn has_rel(&self, node_id: u32, direction: Direction, rel_type: &str) -> bool {
409
        self.snapshot.has_rel(node_id, direction.into(), rel_type)
410
    }
411

412
    /// Whether `node_id` has a neighbor (via `rel_type`, `direction`) whose node
413
    /// property `key` equals `value` — resolves the value once, then compares ids
414
    /// per neighbor (vs a per-neighbor Python property read).
415
    fn has_neighbor_with_property(
416
        &self,
417
        node_id: u32,
418
        direction: Direction,
419
        rel_type: &str,
420
        key: &str,
421
        value: &Bound<'_, PyAny>,
422
    ) -> PyResult<bool> {
423
        let Some(value_id) = self.py_value_to_id_opt(value)? else {
424
            return Ok(false);
425
        };
426
        Ok(self.snapshot.has_neighbor_with_property(
427
            node_id,
428
            direction.into(),
429
            rel_type,
430
            key,
431
            value_id,
432
        ))
433
    }
434

435
    /// The smallest node carrying `label` with property `key` == `value`, or
436
    /// `None` — collapses `nodes_with_property(..)[0]`, label-scoped so a `name`
437
    /// shared across labels stays unambiguous.
438
    fn node_with_label_property(
439
        &self,
440
        label: &str,
441
        key: &str,
442
        value: &Bound<'_, PyAny>,
443
    ) -> PyResult<Option<u32>> {
444
        let Some(value_id) = self.py_value_to_id_opt(value)? else {
445
            return Ok(None);
446
        };
447
        Ok(self.snapshot.node_with_label_property(label, key, value_id))
448
    }
449

450
    /// First neighbor of `node_id` via `rel_type` in `direction`, or `None` —
451
    /// returns the id (not a `Node`), short-circuiting the scan.
452
    fn first_neighbor(&self, node_id: u32, direction: Direction, rel_type: &str) -> Option<u32> {
453
        self.snapshot
454
            .first_neighbor(node_id, direction.into(), rel_type)
455
    }
456

457
    /// Follow a fixed chain of `(direction, rel_type)` steps from `start`, taking
458
    /// the first neighbor at each; `None` if a step has no neighbor. Returns the
459
    /// final node id (not a list of `Node`s).
460
    fn follow(&self, start: u32, steps: Vec<(Direction, String)>) -> Option<u32> {
461
        let steps: Vec<(rustychickpeas_core::Direction, &str)> = steps
462
            .iter()
463
            .map(|(d, r)| ((*d).into(), r.as_str()))
464
            .collect();
465
        self.snapshot.follow(start, &steps)
466
    }
467

468
    /// The root each node reaches by following the *functional* `rel` in `direction`
469
    /// (each node has one successor — e.g. a message's `replyOf` thread root); a node
470
    /// already terminal maps to itself. Returns a `NodeArray` array indexed by node id —
471
    /// `roots[node]` or `memoryview(roots)` in a hot loop. The forest-root array is
472
    /// built once and cached on the snapshot, so this is the bulk form to reach for
473
    /// over a per-node `root_via`. `None` if `rel` is unknown.
474
    fn roots_via(&self, rel: &str, direction: Direction) -> Option<NodeArray> {
475
        let rt = self.snapshot.relationship_type_from_str(rel)?;
476
        let inner = self.snapshot.chain_roots(direction.into(), rt);
477
        let len = inner.len() as ffi::Py_ssize_t;
478
        Some(NodeArray {
479
            inner,
480
            shape: [len],
481
            strides: [4],
482
        })
483
    }
484

485
    /// The root of a single `node` via the functional `rel` in `direction` (see
486
    /// `roots_via`). Convenience for a one-off lookup; in a per-node loop prefer
487
    /// `roots_via` and index it. `None` if `rel` is unknown.
488
    fn root_via(&self, node: u32, rel: &str, direction: Direction) -> Option<u32> {
489
        let rt = self.snapshot.relationship_type_from_str(rel)?;
490
        Some(self.snapshot.chain_root(node, direction.into(), rt))
491
    }
492

493
    /// The single neighbor each node reaches via the *functional* `rel` in `direction`
494
    /// (one hop — e.g. a message's `hasCreator` -> its creator). The depth-1 sibling of
495
    /// `roots_via`: where that follows the chain to its terminal, this takes one step.
496
    /// Returns a `NodeArray` indexed by node id; a node with no such neighbor maps to
497
    /// `u32::MAX` (4294967295). Built fresh each call (one `first_neighbor` per node,
498
    /// GIL released), so hold the result for a hot loop. `None` if `rel` is unknown.
499
    fn neighbor_via(&self, py: Python<'_>, rel: &str, direction: Direction) -> Option<NodeArray> {
500
        let rt = self.snapshot.relationship_type_from_str(rel)?;
501
        let dir: rustychickpeas_core::Direction = direction.into();
502
        let snapshot = self.snapshot.clone();
503
        let inner: Arc<[u32]> = py.allow_threads(move || {
504
            let v: Vec<u32> = (0..snapshot.n_nodes)
505
                .map(|node| snapshot.first_neighbor(node, dir, rt).unwrap_or(u32::MAX))
506
                .collect();
507
            v.into()
508
        });
509
        let len = inner.len() as ffi::Py_ssize_t;
510
        Some(NodeArray {
511
            inner,
512
            shape: [len],
513
            strides: [4],
514
        })
515
    }
516

517
    /// Fold relationship `rel` (in `direction`) into a `PairWeights` map by projecting
518
    /// both endpoints of each rel through `projection` (a `NodeArray`, e.g. from
519
    /// `neighbor_via` or `roots_via`) — the one-mode / bipartite projection ("network
520
    /// folding") of a relation onto a derived node set. For each `rel` rel `a -> b`,
521
    /// the unordered pair `(min, max)` of `projection[a]` / `projection[b]` gets one
522
    /// count; self-pairs and endpoints mapping to the `u32::MAX` sentinel are skipped.
523
    /// Runs the parallel core kernel with the GIL released. The result stays resident
524
    /// (no per-pair Python object) so it can drive a native weighted `dijkstra` without
525
    /// a per-rel callback; `to_dict()` materializes it. E.g. BI Q19's person
526
    /// interaction graph: ``g.fold_via("replyOf", Direction.Outgoing,
527
    /// g.neighbor_via("hasCreator", Direction.Incoming))``.
528
    fn fold_via(
529
        &self,
530
        py: Python<'_>,
531
        rel: &str,
532
        direction: Direction,
533
        projection: &NodeArray,
534
    ) -> PairWeights {
535
        let snapshot = self.snapshot.clone();
536
        let dir: rustychickpeas_core::Direction = direction.into();
537
        let rel = rel.to_owned();
538
        let proj = projection.inner.clone();
539
        let map: std::collections::HashMap<(u32, u32), u64> = py.allow_threads(move || {
540
            snapshot
541
                .fold_via(&rel, dir, proj.as_ref())
542
                .into_iter()
543
                .collect()
544
        });
545
        PairWeights {
546
            inner: Arc::new(map),
547
        }
548
    }
549

550
    /// Single-source weighted shortest paths (Dijkstra) from `source` along `rel` in
551
    /// `direction`, with rel costs derived from a resident `weights` map (`PairWeights`,
552
    /// e.g. from `fold_via`). The cost of rel `(u, v)` is `1.0 / (weights[(u, v)] + base)`;
553
    /// a pair absent from `weights` is untraversable when `prune_missing` (else costs
554
    /// `1.0 / base`). Returns `{node_id: cost}` for every node reached (the source maps to
555
    /// `0.0`); pass `target` to stop once it is settled. The weight lookup runs inside the
556
    /// native kernel with the GIL released — no per-rel Python callback. E.g. BI Q19's
557
    /// interaction path: ``g.dijkstra(p1, Direction.Outgoing, "knows", weights=interaction,
558
    /// base=0.0, prune_missing=True)``.
559
    #[allow(clippy::too_many_arguments)]
560
    #[pyo3(signature = (source, direction, rel, *, weights, base=0.0, prune_missing=false, target=None))]
561
    fn dijkstra(
562
        &self,
563
        py: Python<'_>,
564
        source: u32,
565
        direction: Direction,
566
        rel: &str,
567
        weights: &PairWeights,
568
        base: f64,
569
        prune_missing: bool,
570
        target: Option<u32>,
571
    ) -> std::collections::HashMap<u32, f64> {
572
        let snapshot = self.snapshot.clone();
573
        let dir: rustychickpeas_core::Direction = direction.into();
574
        let rel = rel.to_owned();
575
        let map = weights.inner.clone();
576
        py.allow_threads(move || {
577
            let paths = snapshot.dijkstra(source, dir, rel.as_str(), target, |from, r| {
578
                let key = if from < r.neighbor {
579
                    (from, r.neighbor)
580
                } else {
581
                    (r.neighbor, from)
582
                };
583
                match map.get(&key) {
584
                    Some(&w) => 1.0 / (w as f64 + base),
585
                    None if prune_missing => f64::INFINITY,
586
                    None => 1.0 / base,
587
                }
588
            });
589
            paths.into_distances().into_iter().collect()
590
        })
591
    }
592

593
    /// Bulk-read a node's `rel_type` rels in `direction` as aligned arrays: the
594
    /// neighbor ids plus one Python list per requested property key (read from the
595
    /// rel column by CSR position, each column resolved once). The property-bearing
596
    /// bulk sibling of `neighbor_ids` — avoids the per-rel `Relationship` object and
597
    /// per-key `get_property` of `relationships()`. Returns `(neighbors, [values,
598
    /// ...])` aligned by index; a property absent on a rel is `None`.
599
    fn rels_with_props(
600
        &self,
601
        py: Python<'_>,
602
        node_id: u32,
603
        direction: Direction,
604
        rel_type: &str,
605
        prop_keys: Vec<String>,
606
    ) -> PyResult<PyObject> {
607
        use pyo3::types::PyList;
608
        use rustychickpeas_core::{BoolCol, ColumnDtype, Direction as CoreDir, F64Col, I64Col};
609
        enum H<'a> {
610
            I64(I64Col<'a>),
611
            F64(F64Col<'a>),
612
            Bool(BoolCol<'a>),
613
            None,
614
        }
615
        let snap = &self.snapshot;
616
        let dir: CoreDir = direction.into();
617
        let neighbors = PyList::empty(py);
618
        let cols: Vec<Bound<PyList>> = (0..prop_keys.len()).map(|_| PyList::empty(py)).collect();
619
        let rt = match snap.rel_type(rel_type) {
620
            Some(rt) if (node_id as usize) + 1 < snap.out_offsets.len() => rt,
621
            _ => return (neighbors, cols).into_py_any(py),
622
        };
623
        // Resolve each rel column to a typed reader once (not per rel).
624
        let hoisted: Vec<H> = prop_keys
625
            .iter()
626
            .map(|k| match snap.rel_col(k).map(|c| c.dtype()) {
627
                Some(ColumnDtype::I64) => H::I64(snap.rel_col(k).unwrap().i64()),
628
                Some(ColumnDtype::F64) => H::F64(snap.rel_col(k).unwrap().f64()),
629
                Some(ColumnDtype::Bool) => H::Bool(snap.rel_col(k).unwrap().bool()),
630
                _ => H::None,
631
            })
632
            .collect();
633
        let n = node_id as usize;
634
        let mut ranges: Vec<(usize, usize, bool)> = Vec::new();
635
        if matches!(dir, CoreDir::Outgoing | CoreDir::Both) {
636
            ranges.push((
637
                snap.out_offsets[n] as usize,
638
                snap.out_offsets[n + 1] as usize,
639
                true,
640
            ));
641
        }
642
        if matches!(dir, CoreDir::Incoming | CoreDir::Both) {
643
            ranges.push((
644
                snap.in_offsets[n] as usize,
645
                snap.in_offsets[n + 1] as usize,
646
                false,
647
            ));
648
        }
649
        for (s, e, outgoing) in ranges {
650
            for i in s..e {
651
                let (rtype, nbr, pos) = if outgoing {
652
                    (snap.out_types[i], snap.out_nbrs[i], i as u32)
653
                } else {
654
                    (
655
                        snap.in_types[i],
656
                        snap.in_nbrs[i],
657
                        snap.in_to_out.get(i).copied().unwrap_or(i as u32),
658
                    )
659
                };
660
                if rtype != rt {
661
                    continue;
662
                }
663
                neighbors.append(nbr)?;
664
                for (j, h) in hoisted.iter().enumerate() {
665
                    let v: PyObject = match h {
666
                        H::I64(c) => c.get(pos).into_py_any(py)?,
667
                        H::F64(c) => c.get(pos).into_py_any(py)?,
668
                        H::Bool(c) => c.get(pos).into_py_any(py)?,
669
                        H::None => py.None(),
670
                    };
671
                    cols[j].append(v)?;
672
                }
673
            }
674
        }
675
        (neighbors, cols).into_py_any(py)
676
    }
677

678
    /// Like `rels_with_props` but returns a `RelView` of zero-copy buffer arrays
679
    /// (`memoryview`-able, no per-value Python boxing): `.neighbors` (u32) and
680
    /// `.col(key)` (i64/f64). The native gather runs with the GIL released; a
681
    /// reduction like `sum(memoryview(v.col("amt")))` then runs at C speed. Missing
682
    /// props default to 0 (a typed buffer can't hold `None`).
683
    fn rel_view(
684
        &self,
685
        py: Python<'_>,
686
        node_id: u32,
687
        direction: Direction,
688
        rel_type: String,
689
        prop_keys: Vec<String>,
690
    ) -> RelView {
691
        use rustychickpeas_core::{ColumnDtype, Direction as CoreDir, F64Col, I64Col};
692
        enum KeyAcc<'a> {
693
            I64(I64Col<'a>, Vec<i64>),
694
            F64(F64Col<'a>, Vec<f64>),
695
            None(Vec<i64>),
696
        }
697
        let snapshot = self.snapshot.clone();
698
        let dir: CoreDir = direction.into();
699
        let n = node_id as usize;
700
        let (neighbors, cols): (Vec<u32>, Vec<(String, RelArrayData)>) =
701
            py.allow_threads(move || {
702
                let snap = &snapshot;
703
                let mut nbrs: Vec<u32> = Vec::new();
704
                let mut accs: Vec<KeyAcc> = prop_keys
705
                    .iter()
706
                    .map(|k| match snap.rel_col(k).map(|c| c.dtype()) {
707
                        Some(ColumnDtype::I64) => {
708
                            KeyAcc::I64(snap.rel_col(k).unwrap().i64(), Vec::new())
709
                        }
710
                        Some(ColumnDtype::F64) => {
711
                            KeyAcc::F64(snap.rel_col(k).unwrap().f64(), Vec::new())
712
                        }
713
                        _ => KeyAcc::None(Vec::new()),
714
                    })
715
                    .collect();
716
                let rt = snap.rel_type(&rel_type);
717
                if let (Some(rt), true) = (rt, n + 1 < snap.out_offsets.len()) {
718
                    let mut ranges: Vec<(usize, usize, bool)> = Vec::new();
719
                    if matches!(dir, CoreDir::Outgoing | CoreDir::Both) {
720
                        ranges.push((
721
                            snap.out_offsets[n] as usize,
722
                            snap.out_offsets[n + 1] as usize,
723
                            true,
724
                        ));
725
                    }
726
                    if matches!(dir, CoreDir::Incoming | CoreDir::Both) {
727
                        ranges.push((
728
                            snap.in_offsets[n] as usize,
729
                            snap.in_offsets[n + 1] as usize,
730
                            false,
731
                        ));
732
                    }
733
                    for (s, e, outgoing) in ranges {
734
                        for i in s..e {
735
                            let (rtype, nbr, pos) = if outgoing {
736
                                (snap.out_types[i], snap.out_nbrs[i], i as u32)
737
                            } else {
738
                                (
739
                                    snap.in_types[i],
740
                                    snap.in_nbrs[i],
741
                                    snap.in_to_out.get(i).copied().unwrap_or(i as u32),
742
                                )
743
                            };
744
                            if rtype != rt {
745
                                continue;
746
                            }
747
                            nbrs.push(nbr);
748
                            for acc in accs.iter_mut() {
749
                                match acc {
750
                                    KeyAcc::I64(c, v) => v.push(c.get(pos).unwrap_or(0)),
751
                                    KeyAcc::F64(c, v) => v.push(c.get(pos).unwrap_or(0.0)),
752
                                    KeyAcc::None(v) => v.push(0),
753
                                }
754
                            }
755
                        }
756
                    }
757
                }
758
                let cols = prop_keys
759
                    .into_iter()
760
                    .zip(accs)
761
                    .map(|(k, acc)| match acc {
762
                        KeyAcc::I64(_, v) => (k, RelArrayData::I64(v.into())),
763
                        KeyAcc::F64(_, v) => (k, RelArrayData::F64(v.into())),
764
                        KeyAcc::None(v) => (k, RelArrayData::I64(v.into())),
765
                    })
766
                    .collect();
767
                (nbrs, cols)
768
            });
769
        RelView {
770
            neighbors: neighbors.into(),
771
            cols,
772
        }
773
    }
774

775
    /// Build a `NeighborGroups` query over each source node's `rel` neighbors (in
776
    /// `direction`): group each source's neighbors by a projected attribute and
777
    /// reduce per source. Nothing runs until a terminal (`.sizes()` /
778
    /// `.top_by_size(...)`). E.g. BI Q4's biggest single-country membership per
779
    /// forum: ``g.neighbor_groups(forums, "hasMember", Direction.Outgoing)
780
    /// .project([(Direction.Outgoing, "isLocatedIn"), (Direction.Outgoing, "isPartOf")])
781
    /// .top_by_size(100, tie="flid")``.
782
    fn neighbor_groups(
783
        &self,
784
        sources: Vec<u32>,
785
        rel: String,
786
        direction: Direction,
787
    ) -> NeighborGroups {
788
        NeighborGroups {
789
            snapshot: self.snapshot.clone(),
790
            sources,
791
            rel,
792
            direction: direction.into(),
793
            project: Vec::new(),
794
        }
795
    }
796

797
    /// The string property `key` of `node_id`, or `None` when absent **or empty**
798
    /// (a dense string column stores a missing value as `""`).
799
    fn prop_str(&self, node_id: u32, key: &str) -> Option<String> {
800
        self.snapshot.prop_str(node_id, key).map(str::to_string)
801
    }
802

803
    /// All nodes within `min_hops..=max_hops` of `seed`, expanding only along
804
    /// `rel_type` in `direction` — the typed k-hop neighborhood as a list of ids
805
    /// (excludes `seed`). `min_hops` defaults to 1.
806
    #[pyo3(signature = (seed, direction, rel_type, max_hops, min_hops=1))]
807
    fn neighborhood(
808
        &self,
809
        seed: u32,
810
        direction: Direction,
811
        rel_type: &str,
812
        max_hops: u32,
813
        min_hops: u32,
814
    ) -> Vec<u32> {
815
        self.snapshot
816
            .neighborhood(seed, direction.into(), rel_type, min_hops..=max_hops)
817
            .iter()
818
            .collect()
819
    }
820

821
    /// The dense `i64` column `key` as a list (one value per node id), or `None`
822
    /// when the column is absent or not a dense `i64` column. Built on the
823
    /// zero-copy slice reader; the slice is copied to cross PyO3.
824
    fn i64_column(&self, key: &str) -> Option<Vec<i64>> {
825
        self.snapshot
826
            .col(key)
827
            .map(|c| c.i64())
828
            .and_then(|c| c.as_slice().map(<[i64]>::to_vec))
829
    }
830

831
    /// The interned id (code) for `s` in this snapshot, or `None` if `s` was never
832
    /// interned (so no node can carry it). Resolve filter targets to codes once,
833
    /// then compare them against a string [`Column`]'s codes (dtype `'string'`)
834
    /// vectorized — e.g. `numpy.isin(numpy.asarray(g.column("lang")), [c1, c2])`.
835
    fn string_id(&self, s: &str) -> Option<u32> {
836
        self.get_string_id(s)
837
    }
838

839
    /// A dense property column as a self-describing [`Column`] (its dtype is
840
    /// intrinsic — no `.i64()` narrowing), or `None` when the key is absent or the
841
    /// column is not stored densely. The `Column` supports the buffer protocol, so
842
    /// `numpy.asarray(col)` / `pyarrow.py_buffer(col)` / `memoryview(col)` read it
843
    /// zero-copy, and `col.to_pylist()` gives a plain Python list.
844
    fn column(&self, key: &str) -> Option<Column> {
845
        Column::build(self.snapshot.clone(), key)
846
    }
847

848
    /// Low-level grouped reduction over dense `i64` node columns, run in Rust with
849
    /// the GIL released. Prefer the fluent [`GraphSnapshot::aggregate`] builder —
850
    /// this is the kernel it calls, kept public for direct use.
851
    ///
852
    /// Scans the nodes of each label in `labels`. A row counts toward `total` when
853
    /// it passes every `pre_filters` predicate `(column, op, value)` (op ∈
854
    /// `<,<=,>,>=,==,!=`); rows additionally passing every `group_filters` predicate
855
    /// are grouped. The group key is the label index (when `group_label`), then each
856
    /// `group_cols` value, then a bucket index per `group_bins` `(column, bounds)`
857
    /// (bucket = count of `bounds <= value`). Returns `(rows, total)` with
858
    /// `rows = [(key_tuple, count, sum), ...]`. All referenced columns must be dense
859
    /// `i64` columns; a missing label or non-dense/-i64 column raises `ValueError`.
860
    #[pyo3(signature = (labels, pre_filters=vec![], group_filters=vec![], group_label=false, group_cols=vec![], group_bins=vec![], sum_col=None))]
861
    #[allow(clippy::too_many_arguments, clippy::type_complexity)]
862
    fn group_reduce(
863
        &self,
864
        py: Python<'_>,
865
        labels: Vec<String>,
866
        pre_filters: Vec<(String, String, i64)>,
867
        group_filters: Vec<(String, String, i64)>,
868
        group_label: bool,
869
        group_cols: Vec<String>,
870
        group_bins: Vec<(String, Vec<i64>)>,
871
        sum_col: Option<String>,
872
    ) -> PyResult<(Vec<(Vec<i64>, u64, i64)>, u64)> {
873
        let mut group: Vec<GroupSpec> = group_cols.into_iter().map(GroupSpec::Col).collect();
874
        group.extend(group_bins.into_iter().map(|(c, e)| GroupSpec::Bin(c, e)));
875
        let agg = build_core_agg(
876
            &self.snapshot,
877
            &labels,
878
            &pre_filters,
879
            &group_filters,
880
            group_label,
881
            &group,
882
            sum_col.as_deref(),
883
            None,
884
            None,
885
            &[],
886
        )?;
887
        let res = py
888
            .allow_threads(|| agg.run())
889
            .map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?;
890
        let rows = res
891
            .rows
892
            .into_iter()
893
            .map(|r| (r.key, r.count, r.sum))
894
            .collect();
895
        Ok((rows, res.total))
896
    }
897

898
    /// Start a fluent aggregation over the given node labels — the Pythonic front
899
    /// for [`group_reduce`](Self::group_reduce). Chain `.where(col, op, value)` /
900
    /// `.having(col, op, value)` / `.by(col)` / `.bin(col, bounds)` / `.by_label()` /
901
    /// `.sum(col)`, then `.run()` for a result with `.total` and self-describing dict
902
    /// `.rows` (the source label comes back as its name). The heavy scan runs in Rust
903
    /// with the GIL released — no numpy/pyarrow needed.
904
    #[pyo3(signature = (*labels))]
905
    fn aggregate(&self, labels: Vec<String>) -> Aggregation {
906
        Aggregation {
907
            snapshot: self.snapshot.clone(),
908
            labels,
909
            where_filters: Vec::new(),
910
            having_filters: Vec::new(),
911
            by_label: false,
912
            group: Vec::new(),
913
            sum_col: None,
914
            through: None,
915
            neighbor_filter: None,
916
            projected_filters: Vec::new(),
917
        }
918
    }
919

920
    /// Weighted (or unweighted) shortest-path cost from `source` to `target`, or
921
    /// `None` if `target` is unreachable.
922
    ///
923
    /// With `weight_property`, each followed rel costs that f64/i64 rel
924
    /// property (an rel that lacks it is skipped); without it, every rel costs
925
    /// 1.0 (a hop count). `rel_types` optionally restricts which relationship
926
    /// types are followed (all types when `None`). Weights must be non-negative;
927
    /// the search is a bidirectional Dijkstra.
928
    #[pyo3(signature = (source, target, direction=Direction::Both, rel_types=None, weight_property=None))]
929
    fn shortest_path(
930
        &self,
931
        py: Python<'_>,
932
        source: u32,
933
        target: u32,
934
        direction: Direction,
935
        rel_types: Option<Vec<String>>,
936
        weight_property: Option<String>,
937
    ) -> PyResult<Option<f64>> {
938
        if source >= self.snapshot.n_nodes || target >= self.snapshot.n_nodes {
939
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
940
                "Node ID out of range (max: {})",
941
                self.snapshot.n_nodes.saturating_sub(1)
942
            )));
943
        }
944
        let types: Vec<&str> = rel_types
945
            .as_ref()
946
            .map(|t| t.iter().map(|s| s.as_str()).collect())
947
            .unwrap_or_default();
948
        let snapshot = self.snapshot.clone();
949
        // No Python is called during the search, so release the GIL.
950
        let cost = py.allow_threads(move || {
951
            let weight = |_from: u32, rel: &RelationshipRef| -> f64 {
952
                match &weight_property {
953
                    Some(prop) => match snapshot.rel_prop(rel.pos, prop).map(|p| p.value()) {
954
                        Some(ValueId::F64(bits)) => f64::from_bits(bits),
955
                        Some(ValueId::I64(w)) => w as f64,
956
                        _ => f64::INFINITY,
957
                    },
958
                    None => 1.0,
959
                }
960
            };
961
            snapshot.weighted_shortest_path(source, target, direction.into(), &types[..], weight)
962
        });
963
        Ok(cost)
964
    }
965

966
    /// Get relationships by type using the type_index bitmap for O(1) lookup
967
    /// Returns Relationship objects for all relationships of the specified type
968
    fn relationships_with_type(&self, rel_type: String) -> PyResult<Vec<Relationship>> {
969
        let rel_type_id = self.rel_type_from_str(&rel_type).ok_or_else(|| {
970
            PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
971
                "Relationship type '{}' not found",
972
                rel_type
973
            ))
974
        })?;
975

976
        if let Some(bitmap) = self.snapshot.type_index.get(&rel_type_id) {
977
            let relationships: Vec<Relationship> = bitmap
978
                .iter()
979
                .map(|idx| Relationship {
980
                    snapshot: self.snapshot.clone(),
981
                    rel_index: idx,
982
                    is_outgoing: true,
983
                })
984
                .collect();
985
            Ok(relationships)
986
        } else {
987
            Ok(Vec::new())
988
        }
989
    }
990

991
    /// Get all nodes (returns all node IDs that have data: labels, rels, or properties)
992
    fn all_nodes(&self) -> PyResult<Vec<u32>> {
993
        use std::collections::HashSet;
994
        let mut nodes = HashSet::new();
995

996
        // Add nodes with labels
997
        for (_label, node_set) in &self.snapshot.label_index {
998
            for node_id in node_set.iter() {
999
                nodes.insert(node_id);
1000
            }
1001
        }
1002

1003
        // Add nodes with rels (check CSR arrays)
1004
        // Nodes with outgoing rels
1005
        for node_id in 0..self.snapshot.out_offsets.len().saturating_sub(1) {
1006
            let start = self.snapshot.out_offsets[node_id] as usize;
1007
            let end = self.snapshot.out_offsets[node_id + 1] as usize;
1008
            if start < end {
1009
                nodes.insert(node_id as u32);
1010
            }
1011
        }
1012

1013
        // Nodes with incoming rels
1014
        for node_id in 0..self.snapshot.in_offsets.len().saturating_sub(1) {
1015
            let start = self.snapshot.in_offsets[node_id] as usize;
1016
            let end = self.snapshot.in_offsets[node_id + 1] as usize;
1017
            if start < end {
1018
                nodes.insert(node_id as u32);
1019
            }
1020
        }
1021

1022
        // Add nodes with properties
1023
        for column in self.snapshot.columns.values() {
1024
            match column {
1025
                rustychickpeas_core::Column::DenseI64(_)
1026
                | rustychickpeas_core::Column::DenseF64(_)
1027
                | rustychickpeas_core::Column::DenseBool(_)
1028
                | rustychickpeas_core::Column::DenseStr(_) => {
1029
                    // Dense columns: all nodes from 0 to n_nodes-1 have this property
1030
                    // But we only want nodes that actually have data, so skip dense columns
1031
                    // (they're dense because most nodes have the property, but we can't tell which ones)
1032
                }
1033
                rustychickpeas_core::Column::SparseI64(pairs) => {
1034
                    for (node_id, _) in pairs {
1035
                        nodes.insert(*node_id);
1036
                    }
1037
                }
1038
                rustychickpeas_core::Column::SparseF64(pairs) => {
1039
                    for (node_id, _) in pairs {
1040
                        nodes.insert(*node_id);
1041
                    }
1042
                }
1043
                rustychickpeas_core::Column::SparseBool(pairs) => {
1044
                    for (node_id, _) in pairs {
1045
                        nodes.insert(*node_id);
1046
                    }
1047
                }
1048
                rustychickpeas_core::Column::SparseStr(pairs) => {
1049
                    for (node_id, _) in pairs {
1050
                        nodes.insert(*node_id);
1051
                    }
1052
                }
1053
                rustychickpeas_core::Column::RankI64 { present, .. }
1054
                | rustychickpeas_core::Column::RankF64 { present, .. }
1055
                | rustychickpeas_core::Column::RankBool { present, .. }
1056
                | rustychickpeas_core::Column::RankStr { present, .. } => {
1057
                    for pos in present.iter_ones() {
1058
                        nodes.insert(pos as u32);
1059
                    }
1060
                }
1061
            }
1062
        }
1063

1064
        let mut result: Vec<u32> = nodes.into_iter().collect();
1065
        result.sort_unstable();
1066
        Ok(result)
1067
    }
1068

1069
    /// Get all relationships as Relationship objects
1070
    /// Returns all relationships in the graph
1071
    fn all_relationships(&self) -> PyResult<Vec<Relationship>> {
1072
        let mut relationships = Vec::with_capacity(self.snapshot.out_nbrs.len());
1073

1074
        // Iterate through all outgoing relationships (each relationship appears once)
1075
        for idx in 0..self.snapshot.out_nbrs.len() {
1076
            relationships.push(Relationship {
1077
                snapshot: self.snapshot.clone(),
1078
                rel_index: idx as u32,
1079
                is_outgoing: true,
1080
            });
1081
        }
1082

1083
        Ok(relationships)
1084
    }
1085

1086
    /// Get a relationship by index
1087
    ///
1088
    /// Uses the outgoing relationship index (canonical index in out_nbrs).
1089
    /// Each relationship appears once in out_nbrs, so this is a unique identifier.
1090
    ///
1091
    /// # Arguments
1092
    /// * `rel_index` - The relationship index in out_nbrs (0 to n_rels-1)
1093
    fn relationship(&self, rel_index: u32) -> PyResult<Relationship> {
1094
        let max_index = self.snapshot.out_nbrs.len() as u32;
1095

1096
        if rel_index >= max_index {
1097
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1098
                "Relationship index {} out of range (max: {})",
1099
                rel_index,
1100
                max_index.saturating_sub(1)
1101
            )));
1102
        }
1103

1104
        Ok(Relationship {
1105
            snapshot: self.snapshot.clone(),
1106
            rel_index,
1107
            is_outgoing: true, // Always true since we use out_nbrs as canonical
1108
        })
1109
    }
1110

1111
    /// Get a relationship by node pair
1112
    ///
1113
    /// Finds a relationship between two nodes. If multiple relationships exist
1114
    /// between the same nodes, returns the first one found.
1115
    ///
1116
    /// # Arguments
1117
    /// * `start_node` - Source node ID
1118
    /// * `end_node` - Destination node ID
1119
    fn relationship_by_nodes(
1120
        &self,
1121
        start_node: u32,
1122
        end_node: u32,
1123
    ) -> PyResult<Option<Relationship>> {
1124
        // Check if start_node is valid
1125
        if start_node as usize >= self.snapshot.out_offsets.len().saturating_sub(1) {
1126
            return Ok(None);
1127
        }
1128

1129
        let start = self.snapshot.out_offsets[start_node as usize] as usize;
1130
        let end = self.snapshot.out_offsets[start_node as usize + 1] as usize;
1131

1132
        // Search for end_node in the outgoing neighbors of start_node
1133
        for (idx, &nbr) in self.snapshot.out_nbrs[start..end].iter().enumerate() {
1134
            if nbr == end_node {
1135
                return Ok(Some(Relationship {
1136
                    snapshot: self.snapshot.clone(),
1137
                    rel_index: (start + idx) as u32,
1138
                    is_outgoing: true,
1139
                }));
1140
            }
1141
        }
1142

1143
        Ok(None)
1144
    }
1145

1146
    /// Get property value for a node
1147
    fn get_property(&self, node_id: u32, key: String) -> PyResult<Option<PyObject>> {
1148
        let key_id = self.property_key_from_str(&key);
1149
        if key_id.is_none() {
1150
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1151
                "Property key '{}' not found",
1152
                key
1153
            )));
1154
        }
1155

1156
        let value_id = self.snapshot.prop(node_id, &key).map(|p| p.value());
1157

1158
        Python::with_gil(|py| {
1159
            Ok(value_id.and_then(|vid| value_id_to_pyobject(py, vid, &self.snapshot.atoms)))
1160
        })
1161
    }
1162

1163
    /// Get nodes with a specific property value, scoped by label
1164
    ///
1165
    /// # Arguments
1166
    /// * `label` - The label to scope the query to
1167
    /// * `key` - The property key
1168
    /// * `value` - The property value to search for
1169
    #[pyo3(signature = (label, key, value))]
1170
    fn nodes_with_property(
1171
        &self,
1172
        label: String,
1173
        key: String,
1174
        value: &Bound<'_, PyAny>,
1175
    ) -> PyResult<Vec<u32>> {
1176
        // Check if label exists - need to do this before calling nodes_with_property()
1177
        // because it returns None for both "label doesn't exist" and "no matches"
1178
        let label_id = self.label_from_str(&label);
1179
        if label_id.is_none() {
1180
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1181
                "Label '{}' not found",
1182
                label
1183
            )));
1184
        }
1185

1186
        // Check if property key exists
1187
        let key_id = self.property_key_from_str(&key);
1188
        if key_id.is_none() {
1189
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1190
                "Property key '{}' not found",
1191
                key
1192
            )));
1193
        }
1194

1195
        // Both label and key exist, now convert value and query
1196
        let prop_value = py_to_property_value(value)?;
1197
        let value_id = match prop_value {
1198
            rustychickpeas_core::PropertyValue::String(s) => {
1199
                // Need to find string ID
1200
                let sid = self.get_string_id(&s).ok_or_else(|| {
1201
                    PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1202
                        "Property value string '{}' not found",
1203
                        s
1204
                    ))
1205
                })?;
1206
                ValueId::Str(sid)
1207
            }
1208
            rustychickpeas_core::PropertyValue::Integer(i) => ValueId::I64(i),
1209
            rustychickpeas_core::PropertyValue::Float(f) => ValueId::from_f64(f),
1210
            rustychickpeas_core::PropertyValue::Boolean(b) => ValueId::Bool(b),
1211
            rustychickpeas_core::PropertyValue::InternedString(_) => {
1212
                return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
1213
                    "InternedString not supported in GraphSnapshot queries",
1214
                ));
1215
            }
1216
        };
1217

1218
        // get_nodes_with_property now returns Option<NodeSet> (cloned) instead of Option<&NodeSet>
1219
        // None means no matches (valid), Some means matches found
1220
        if let Some(node_set) = self.snapshot.nodes_with_property(&label, &key, value_id) {
1221
            Ok(node_set.iter().collect())
1222
        } else {
1223
            Ok(Vec::new())
1224
        }
1225
    }
1226

1227
    /// Full-text search: node ids of `label` whose `key` string property contains
1228
    /// every whitespace/punctuation-delimited token in `query` (lowercased,
1229
    /// boolean AND), via the core lazily-built inverted index. Returns the node
1230
    /// ids ascending; empty for an unknown label/key, an empty query, or a token
1231
    /// no document contains. Wraps `GraphSnapshot::full_text_search`.
1232
    fn full_text_search(&self, label: &str, key: &str, query: &str) -> Vec<u32> {
1233
        self.snapshot
1234
            .full_text_search(label, key, query)
1235
            .iter()
1236
            .collect()
1237
    }
1238

1239
    /// Geo search: node ids of `label` whose `(lat_key, lon_key)` coordinates fall
1240
    /// within `km` great-circle kilometres of `(lat, lon)`, via the core geo k-d
1241
    /// tree. Returns the node ids ascending. Wraps
1242
    /// `GraphSnapshot::geo_within_radius`.
1243
    fn geo_within_radius(
1244
        &self,
1245
        label: &str,
1246
        lat_key: &str,
1247
        lon_key: &str,
1248
        lat: f64,
1249
        lon: f64,
1250
        km: f64,
1251
    ) -> Vec<u32> {
1252
        self.snapshot
1253
            .geo_within_radius(label, lat_key, lon_key, lat, lon, km)
1254
            .iter()
1255
            .collect()
1256
    }
1257

1258
    /// Geo search: node ids of `label` whose `(lat_key, lon_key)` coordinates fall
1259
    /// in the lat/lon rectangle with corners `(min_lat, min_lon)` and
1260
    /// `(max_lat, max_lon)` (a `min_lon > max_lon` box crosses the antimeridian),
1261
    /// via the core geo k-d tree. Returns the node ids ascending. Wraps
1262
    /// `GraphSnapshot::geo_within_bbox`.
1263
    #[allow(clippy::too_many_arguments)]
1264
    fn geo_within_bbox(
1265
        &self,
1266
        label: &str,
1267
        lat_key: &str,
1268
        lon_key: &str,
1269
        min_lat: f64,
1270
        min_lon: f64,
1271
        max_lat: f64,
1272
        max_lon: f64,
1273
    ) -> Vec<u32> {
1274
        self.snapshot
1275
            .geo_within_bbox(
1276
                label,
1277
                lat_key,
1278
                lon_key,
1279
                (min_lat, min_lon),
1280
                (max_lat, max_lon),
1281
            )
1282
            .iter()
1283
            .collect()
1284
    }
1285

1286
    /// PageRank after `iterations` synchronous pull updates with damping `damping`
1287
    /// (sinks redistribute their rank uniformly). `directed` picks the forward
1288
    /// direction (outgoing for a directed graph, both for undirected). Returns one
1289
    /// score per node id. Runs in Rust with the GIL released.
1290
    /// Wraps `GraphSnapshot::pagerank`.
1291
    fn pagerank(&self, py: Python<'_>, directed: bool, damping: f64, iterations: u32) -> Vec<f64> {
1292
        let snapshot = self.snapshot.clone();
1293
        py.allow_threads(move || snapshot.pagerank(directed, damping, iterations))
1294
    }
1295

1296
    /// Weakly connected components: each node's label is the smallest node id in its
1297
    /// component (flood undirected rels). One label per node id. GIL released.
1298
    /// Wraps `GraphSnapshot::wcc`.
1299
    fn wcc(&self, py: Python<'_>) -> Vec<u32> {
1300
        let snapshot = self.snapshot.clone();
1301
        py.allow_threads(move || snapshot.wcc())
1302
    }
1303

1304
    /// Community detection by `iterations` rounds of synchronous label propagation
1305
    /// (most-frequent neighbour label, smallest on a tie; in+out counted separately
1306
    /// for a directed graph). `seed` gives explicit initial labels per node id
1307
    /// (default: node ids) — pass original vertex ids to match a vertex-id-keyed
1308
    /// reference. One label per node id. GIL released.
1309
    /// Wraps `GraphSnapshot::cdlp` / `cdlp_seeded`.
1310
    #[pyo3(signature = (directed, iterations, seed=None))]
1311
    fn cdlp(
1312
        &self,
1313
        py: Python<'_>,
1314
        directed: bool,
1315
        iterations: u32,
1316
        seed: Option<Vec<u32>>,
1317
    ) -> Vec<u32> {
1318
        let snapshot = self.snapshot.clone();
1319
        py.allow_threads(move || match seed {
1320
            Some(s) => snapshot.cdlp_seeded(directed, iterations, &s),
1321
            None => snapshot.cdlp(directed, iterations),
1322
        })
1323
    }
1324

1325
    /// Local clustering coefficient per node: rels among each node's undirected
1326
    /// neighbour set over the maximum possible (0 when degree <= 1). `directed`
1327
    /// picks the forward direction for the rel count. One value per node id. GIL
1328
    /// released. Wraps `GraphSnapshot::lcc`.
1329
    fn lcc(&self, py: Python<'_>, directed: bool) -> Vec<f64> {
1330
        let snapshot = self.snapshot.clone();
1331
        py.allow_threads(move || snapshot.lcc(directed))
1332
    }
1333

1334
    /// Single-source shortest paths from `source` over forward rels with additive
1335
    /// weights from the `weight_key` rel property (`None` = unit weights);
1336
    /// unreachable nodes get `inf`. One distance per node id. GIL released.
1337
    /// Wraps `GraphSnapshot::sssp`.
1338
    #[pyo3(signature = (source, directed, weight_key=None))]
1339
    fn sssp(
1340
        &self,
1341
        py: Python<'_>,
1342
        source: u32,
1343
        directed: bool,
1344
        weight_key: Option<String>,
1345
    ) -> Vec<f64> {
1346
        let snapshot = self.snapshot.clone();
1347
        py.allow_threads(move || snapshot.sssp(source, directed, weight_key.as_deref()))
1348
    }
1349

1350
    /// Seeded co-occurrence — one-mode / bipartite projection by shared neighbour.
1351
    /// From `seed`, over relationship `rel`, the nodes that share a `rel`-neighbour
1352
    /// with `seed` (seed -> shared centers -> their other `rel`-neighbours), `seed`
1353
    /// excluded. `weight="count"` (default) weighs each co-occurring node by its
1354
    /// shared-center count; `weight="distinct"` (with `distinct_key`) by the number
1355
    /// of distinct values of that property over the shared centers (e.g. distinct
1356
    /// days). Returns `{other: weight}`. GIL released.
1357
    /// Wraps `GraphSnapshot::co_occurring`.
1358
    #[pyo3(signature = (seed, rel, direction, weight=None, distinct_key=None))]
1359
    fn co_occurring(
1360
        &self,
1361
        py: Python<'_>,
1362
        seed: u32,
1363
        rel: String,
1364
        direction: Direction,
1365
        weight: Option<String>,
1366
        distinct_key: Option<String>,
1367
    ) -> PyResult<std::collections::HashMap<u32, u64>> {
1368
        // Validate the weight mode up front (under the GIL), then run released.
1369
        let mode = match weight.as_deref() {
1370
            None | Some("count") => None,
1371
            Some("distinct") => Some(distinct_key.ok_or_else(|| {
1372
                PyErr::new::<pyo3::exceptions::PyValueError, _>(
1373
                    "weight='distinct' requires distinct_key",
1374
                )
1375
            })?),
1376
            Some(other) => {
1377
                return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1378
                    "unknown weight '{}' (use 'count' or 'distinct')",
1379
                    other
1380
                )))
1381
            }
1382
        };
1383
        let snapshot = self.snapshot.clone();
1384
        let dir: rustychickpeas_core::Direction = direction.into();
1385
        let out: std::collections::HashMap<u32, u64> = py.allow_threads(move || {
1386
            let w = match &mode {
1387
                None => rustychickpeas_core::CoWeight::Count,
1388
                Some(k) => rustychickpeas_core::CoWeight::Distinct(k.as_str()),
1389
            };
1390
            // core returns a hashbrown map; collect into std so PyO3 hands back a dict.
1391
            snapshot
1392
                .co_occurring(seed, &rel, dir, w)
1393
                .into_iter()
1394
                .collect()
1395
        });
1396
        Ok(out)
1397
    }
1398

1399
    /// Get the version of this snapshot
1400
    fn version(&self) -> PyResult<Option<String>> {
1401
        Ok(self.snapshot.version().map(|s| s.to_string()))
1402
    }
1403

1404
    /// Create a GraphSnapshot from Parquet files using GraphBuilder
1405
    #[staticmethod]
1406
    #[allow(clippy::too_many_arguments)]
1407
    #[pyo3(signature = (nodes_path=None, relationships_path=None, node_id_column=None, label_columns=None, node_property_columns=None, start_node_column=None, end_node_column=None, rel_type_column=None, rel_property_columns=None))]
1408
    fn read_from_parquet(
1409
        nodes_path: Option<String>,
1410
        relationships_path: Option<String>,
1411
        node_id_column: Option<String>,
1412
        label_columns: Option<Vec<String>>,
1413
        node_property_columns: Option<Vec<String>>,
1414
        start_node_column: Option<String>,
1415
        end_node_column: Option<String>,
1416
        rel_type_column: Option<String>,
1417
        rel_property_columns: Option<Vec<String>>,
1418
    ) -> PyResult<GraphSnapshot> {
1419
        let label_cols = label_columns
1420
            .as_ref()
1421
            .map(|cols| cols.iter().map(|s| s.as_str()).collect());
1422
        let node_prop_cols = node_property_columns
1423
            .as_ref()
1424
            .map(|cols| cols.iter().map(|s| s.as_str()).collect());
1425
        let rel_prop_cols = rel_property_columns
1426
            .as_ref()
1427
            .map(|cols| cols.iter().map(|s| s.as_str()).collect());
1428

1429
        let snapshot = CoreGraphSnapshot::from_parquet(
1430
            nodes_path.as_deref(),
1431
            relationships_path.as_deref(),
1432
            node_id_column.as_deref(),
1433
            label_cols,
1434
            node_prop_cols,
1435
            start_node_column.as_deref(),
1436
            end_node_column.as_deref(),
1437
            rel_type_column.as_deref(),
1438
            rel_prop_cols,
1439
        )
1440
        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1441

1442
        Ok(GraphSnapshot::new(snapshot))
1443
    }
1444

1445
    /// Write this snapshot to an RCPG file on disk. With `topology_only=True`,
1446
    /// omit the property columns for a lean, traversal-only file (per-node data
1447
    /// is expected to live in a record store instead).
1448
    #[pyo3(signature = (path, topology_only=false))]
1449
    fn write_rcpg(&self, path: String, topology_only: bool) -> PyResult<()> {
1450
        if topology_only {
1451
            use rustychickpeas_core::format::rcpg::WriteOptions;
1452
            use std::io::Write;
1453
            let mut file = std::io::BufWriter::new(
1454
                std::fs::File::create(&path)
1455
                    .map_err(|e| PyErr::new::<pyo3::exceptions::PyIOError, _>(e.to_string()))?,
1456
            );
1457
            self.snapshot
1458
                .write_rcpg_with(&mut file, &WriteOptions::topology_only())
1459
                .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1460
            file.flush()
1461
                .map_err(|e| PyErr::new::<pyo3::exceptions::PyIOError, _>(e.to_string()))?;
1462
        } else {
1463
            self.snapshot
1464
                .write_rcpg_file(&path)
1465
                .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1466
        }
1467
        Ok(())
1468
    }
1469

1470
    /// Read a snapshot from an RCPG file on disk (the property index rebuilds
1471
    /// lazily on first use).
1472
    #[staticmethod]
1473
    fn read_rcpg(path: String) -> PyResult<GraphSnapshot> {
1474
        let snapshot = CoreGraphSnapshot::read_rcpg_file(&path)
1475
            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1476
        Ok(GraphSnapshot::new(snapshot))
1477
    }
1478

1479
    /// Bidirectional BFS to find paths between source and target node sets
1480
    ///
1481
    /// Performs BFS from both source and target nodes simultaneously, meeting in the middle.
1482
    /// Returns the intersection of nodes and relationships that lie on paths between the sets.
1483
    ///
1484
    /// # Arguments
1485
    /// * `source_nodes` - List of starting node IDs for forward traversal
1486
    /// * `target_nodes` - List of starting node IDs for backward traversal
1487
    /// * `direction` - Direction of traversal (Direction.Outgoing, Direction.Incoming, or Direction.Both)
1488
    ///   - Outgoing: Forward search uses outgoing rels, backward uses incoming (default for finding paths from source to target)
1489
    ///   - Incoming: Forward search uses incoming rels, backward uses outgoing (reverse direction)
1490
    ///   - Both: Both searches use both directions (bidirectional traversal)
1491
    /// * `rel_types` - Optional list of relationship type names to filter by
1492
    /// * `node_filter` - Optional callable that takes (node_id: int) and returns bool.
1493
    ///   Returns True to include/continue from a node.
1494
    /// * `rel_filter` - Optional callable that takes (from_node: int, to_node: int, rel_type: str, csr_pos: int) and returns bool.
1495
    ///   Returns True to follow a relationship.
1496
    /// * `max_depth` - Optional maximum depth for each direction (default: no limit)
1497
    ///
1498
    /// # Returns
1499
    /// A tuple `(node_ids, rel_csr_positions)` where:
1500
    /// - `node_ids`: List of node IDs on paths between source and target
1501
    /// - `rel_csr_positions`: List of relationship CSR positions on paths between source and target
1502
    ///
1503
    /// # Examples
1504
    /// ```python
1505
    /// from rustychickpeas import Direction
1506
    ///
1507
    /// # Simple bidirectional search (default: Outgoing)
1508
    /// nodes, rels = snapshot.bidirectional_bfs([0, 1], [10, 11], Direction.Outgoing)
1509
    ///
1510
    /// # With relationship type filter
1511
    /// nodes, rels = snapshot.bidirectional_bfs(
1512
    ///     [0, 1], [10, 11],
1513
    ///     Direction.Outgoing,
1514
    ///     rel_types=["KNOWS", "WORKS_WITH"]
1515
    /// )
1516
    ///
1517
    /// # Bidirectional traversal (both directions)
1518
    /// nodes, rels = snapshot.bidirectional_bfs(
1519
    ///     [0, 1], [10, 11],
1520
    ///     Direction.Both
1521
    /// )
1522
    ///
1523
    /// # With node filter (only "Person" nodes)
1524
    /// def node_filter(node_id):
1525
    ///     return "Person" in snapshot.node_labels(node_id)
1526
    ///
1527
    /// nodes, rels = snapshot.bidirectional_bfs(
1528
    ///     [0, 1], [10, 11],
1529
    ///     Direction.Outgoing,
1530
    ///     node_filter=node_filter
1531
    /// )
1532
    /// ```
1533
    #[pyo3(signature = (source_nodes, target_nodes, direction, *, rel_types=None, node_filter=None, rel_filter=None, max_depth=None))]
1534
    #[allow(clippy::too_many_arguments)]
1535
    fn bidirectional_bfs(
1536
        &self,
1537
        py: Python<'_>,
1538
        source_nodes: Vec<u32>,
1539
        target_nodes: Vec<u32>,
1540
        direction: Direction,
1541
        rel_types: Option<Vec<String>>,
1542
        node_filter: Option<PyObject>,
1543
        rel_filter: Option<PyObject>,
1544
        max_depth: Option<u32>,
1545
    ) -> PyResult<(Vec<u32>, Vec<u32>)> {
1546
        let source_set = NodeSet::from(RoaringBitmap::from_iter(source_nodes.iter().copied()));
1547
        let target_set = NodeSet::from(RoaringBitmap::from_iter(target_nodes.iter().copied()));
1548

1549
        use rustychickpeas_core::types::Direction as CoreDirection;
1550
        let rust_direction = match direction {
1551
            Direction::Outgoing => CoreDirection::Outgoing,
1552
            Direction::Incoming => CoreDirection::Incoming,
1553
            Direction::Both => CoreDirection::Both,
1554
        };
1555

1556
        let rel_types_str: Option<Vec<&str>> = rel_types
1557
            .as_ref()
1558
            .map(|types| types.iter().map(|s| s.as_str()).collect());
1559

1560
        // Error cell to capture Python exceptions from filter callbacks
1561
        let error_cell: Arc<Mutex<Option<PyErr>>> = Arc::new(Mutex::new(None));
1562

1563
        // Release the GIL for the traversal; filter callbacks re-acquire it
1564
        // per invocation via Python::with_gil.
1565
        let (node_bitmap, rel_bitmap) = py.allow_threads(|| {
1566
            if let (Some(nf_obj), Some(rf_obj)) = (node_filter.as_ref(), rel_filter.as_ref()) {
1567
                let nf_err = error_cell.clone();
1568
                let rf_err = error_cell.clone();
1569
                self.snapshot.bidirectional_bfs(
1570
                    &source_set,
1571
                    &target_set,
1572
                    rust_direction,
1573
                    rel_types_str.as_deref(),
1574
                    Some(move |node_id: u32, _snapshot: &CoreGraphSnapshot| -> bool {
1575
                        if nf_err
1576
                            .lock()
1577
                            .unwrap_or_else(PoisonError::into_inner)
1578
                            .is_some()
1579
                        {
1580
                            return false;
1581
                        }
1582
                        Python::with_gil(|py| {
1583
                            match nf_obj
1584
                                .call1(py, (node_id,))
1585
                                .and_then(|r| r.extract::<bool>(py))
1586
                            {
1587
                                Ok(v) => v,
1588
                                Err(e) => {
1589
                                    *nf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1590
                                        Some(e);
1591
                                    false
1592
                                }
1593
                            }
1594
                        })
1595
                    }),
1596
                    Some(
1597
                        move |from: u32,
1598
                              to: u32,
1599
                              rel_type: RelationshipType,
1600
                              csr_pos: u32,
1601
                              snapshot: &CoreGraphSnapshot|
1602
                              -> bool {
1603
                            if rf_err
1604
                                .lock()
1605
                                .unwrap_or_else(PoisonError::into_inner)
1606
                                .is_some()
1607
                            {
1608
                                return false;
1609
                            }
1610
                            Python::with_gil(|py| {
1611
                                let rel_type_str = snapshot
1612
                                    .resolve_string(rel_type.id())
1613
                                    .unwrap_or("")
1614
                                    .to_string();
1615
                                match rf_obj
1616
                                    .call1(py, (from, to, rel_type_str, csr_pos))
1617
                                    .and_then(|r| r.extract::<bool>(py))
1618
                                {
1619
                                    Ok(v) => v,
1620
                                    Err(e) => {
1621
                                        *rf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1622
                                            Some(e);
1623
                                        false
1624
                                    }
1625
                                }
1626
                            })
1627
                        },
1628
                    ),
1629
                    max_depth,
1630
                )
1631
            } else if let Some(nf_obj) = node_filter.as_ref() {
1632
                let nf_err = error_cell.clone();
1633
                type RelFilter = fn(u32, u32, RelationshipType, u32, &CoreGraphSnapshot) -> bool;
1634
                self.snapshot.bidirectional_bfs::<_, RelFilter>(
1635
                    &source_set,
1636
                    &target_set,
1637
                    rust_direction,
1638
                    rel_types_str.as_deref(),
1639
                    Some(move |node_id: u32, _snapshot: &CoreGraphSnapshot| -> bool {
1640
                        if nf_err
1641
                            .lock()
1642
                            .unwrap_or_else(PoisonError::into_inner)
1643
                            .is_some()
1644
                        {
1645
                            return false;
1646
                        }
1647
                        Python::with_gil(|py| {
1648
                            match nf_obj
1649
                                .call1(py, (node_id,))
1650
                                .and_then(|r| r.extract::<bool>(py))
1651
                            {
1652
                                Ok(v) => v,
1653
                                Err(e) => {
1654
                                    *nf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1655
                                        Some(e);
1656
                                    false
1657
                                }
1658
                            }
1659
                        })
1660
                    }),
1661
                    None,
1662
                    max_depth,
1663
                )
1664
            } else if let Some(rf_obj) = rel_filter.as_ref() {
1665
                let rf_err = error_cell.clone();
1666
                type NodeFilter = fn(u32, &CoreGraphSnapshot) -> bool;
1667
                self.snapshot.bidirectional_bfs::<NodeFilter, _>(
1668
                    &source_set,
1669
                    &target_set,
1670
                    rust_direction,
1671
                    rel_types_str.as_deref(),
1672
                    None,
1673
                    Some(
1674
                        move |from: u32,
1675
                              to: u32,
1676
                              rel_type: RelationshipType,
1677
                              csr_pos: u32,
1678
                              snapshot: &CoreGraphSnapshot|
1679
                              -> bool {
1680
                            if rf_err
1681
                                .lock()
1682
                                .unwrap_or_else(PoisonError::into_inner)
1683
                                .is_some()
1684
                            {
1685
                                return false;
1686
                            }
1687
                            Python::with_gil(|py| {
1688
                                let rel_type_str = snapshot
1689
                                    .resolve_string(rel_type.id())
1690
                                    .unwrap_or("")
1691
                                    .to_string();
1692
                                match rf_obj
1693
                                    .call1(py, (from, to, rel_type_str, csr_pos))
1694
                                    .and_then(|r| r.extract::<bool>(py))
1695
                                {
1696
                                    Ok(v) => v,
1697
                                    Err(e) => {
1698
                                        *rf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1699
                                            Some(e);
1700
                                        false
1701
                                    }
1702
                                }
1703
                            })
1704
                        },
1705
                    ),
1706
                    max_depth,
1707
                )
1708
            } else {
1709
                type NodeFilter = fn(u32, &CoreGraphSnapshot) -> bool;
1710
                type RelFilter = fn(u32, u32, RelationshipType, u32, &CoreGraphSnapshot) -> bool;
1711
                self.snapshot.bidirectional_bfs::<NodeFilter, RelFilter>(
1712
                    &source_set,
1713
                    &target_set,
1714
                    rust_direction,
1715
                    rel_types_str.as_deref(),
1716
                    None,
1717
                    None,
1718
                    max_depth,
1719
                )
1720
            }
1721
        });
1722

1723
        // Propagate any Python exception captured during BFS
1724
        if let Some(err) = error_cell
1725
            .lock()
1726
            .unwrap_or_else(PoisonError::into_inner)
1727
            .take()
1728
        {
1729
            return Err(err);
1730
        }
1731

1732
        Ok((node_bitmap.iter().collect(), rel_bitmap.iter().collect()))
1733
    }
1734

1735
    /// BFS traversal from a set of starting nodes
1736
    ///
1737
    /// Performs BFS from the starting nodes, following rels in the specified direction.
1738
    /// Returns all nodes and relationships visited during the traversal.
1739
    ///
1740
    /// # Arguments
1741
    /// * `start_nodes` - List of starting node IDs
1742
    /// * `direction` - Direction of traversal (Direction.Outgoing, Direction.Incoming, or Direction.Both)
1743
    ///   - Outgoing: Follow outgoing rels
1744
    ///   - Incoming: Follow incoming rels
1745
    ///   - Both: Follow both outgoing and incoming rels
1746
    /// * `rel_types` - Optional list of relationship type names to filter by
1747
    /// * `node_filter` - Optional callable that takes (node_id: int) and returns bool.
1748
    ///   Returns True to include/continue from a node.
1749
    /// * `rel_filter` - Optional callable that takes (from_node: int, to_node: int, rel_type: str, csr_pos: int) and returns bool.
1750
    ///   Returns True to follow a relationship.
1751
    /// * `max_depth` - Optional maximum depth (default: no limit)
1752
    ///
1753
    /// # Returns
1754
    /// A tuple `(node_ids, rel_csr_positions)` where:
1755
    /// - `node_ids`: List of node IDs visited during traversal
1756
    /// - `rel_csr_positions`: List of relationship CSR positions traversed
1757
    ///
1758
    /// # Examples
1759
    /// ```python
1760
    /// from rustychickpeas import Direction
1761
    ///
1762
    /// # Simple BFS from a single node
1763
    /// nodes, rels = snapshot.bfs([0], Direction.Outgoing)
1764
    ///
1765
    /// # BFS with relationship type filter
1766
    /// nodes, rels = snapshot.bfs(
1767
    ///     [0], Direction.Outgoing,
1768
    ///     rel_types=["KNOWS", "WORKS_WITH"]
1769
    /// )
1770
    ///
1771
    /// # BFS with max depth
1772
    /// nodes, rels = snapshot.bfs(
1773
    ///     [0], Direction.Outgoing,
1774
    ///     max_depth=3
1775
    /// )
1776
    ///
1777
    /// # BFS with node filter (only "Person" nodes)
1778
    /// def node_filter(node_id):
1779
    ///     return "Person" in snapshot.node_labels(node_id)
1780
    ///
1781
    /// nodes, rels = snapshot.bfs(
1782
    ///     [0], Direction.Outgoing,
1783
    ///     node_filter=node_filter
1784
    /// )
1785
    /// ```
1786
    #[pyo3(signature = (start_nodes, direction, *, rel_types=None, node_filter=None, rel_filter=None, max_depth=None))]
1787
    #[allow(clippy::too_many_arguments)]
1788
    fn bfs(
1789
        &self,
1790
        py: Python<'_>,
1791
        start_nodes: Vec<u32>,
1792
        direction: Direction,
1793
        rel_types: Option<Vec<String>>,
1794
        node_filter: Option<PyObject>,
1795
        rel_filter: Option<PyObject>,
1796
        max_depth: Option<u32>,
1797
    ) -> PyResult<(Vec<u32>, Vec<u32>)> {
1798
        let start_set = NodeSet::from(RoaringBitmap::from_iter(start_nodes.iter().copied()));
1799

1800
        use rustychickpeas_core::types::Direction as CoreDirection;
1801
        let rust_direction = match direction {
1802
            Direction::Outgoing => CoreDirection::Outgoing,
1803
            Direction::Incoming => CoreDirection::Incoming,
1804
            Direction::Both => CoreDirection::Both,
1805
        };
1806

1807
        let rel_types_str: Option<Vec<&str>> = rel_types
1808
            .as_ref()
1809
            .map(|types| types.iter().map(|s| s.as_str()).collect());
1810

1811
        // Error cell to capture Python exceptions from filter callbacks
1812
        let error_cell: Arc<Mutex<Option<PyErr>>> = Arc::new(Mutex::new(None));
1813

1814
        // Release the GIL for the traversal; filter callbacks re-acquire it
1815
        // per invocation via Python::with_gil.
1816
        let (node_bitmap, rel_bitmap) = py.allow_threads(|| {
1817
            if let (Some(nf_obj), Some(rf_obj)) = (node_filter.as_ref(), rel_filter.as_ref()) {
1818
                let nf_err = error_cell.clone();
1819
                let rf_err = error_cell.clone();
1820
                self.snapshot.bfs(
1821
                    &start_set,
1822
                    rust_direction,
1823
                    rel_types_str.as_deref(),
1824
                    Some(move |node_id: u32, _snapshot: &CoreGraphSnapshot| -> bool {
1825
                        if nf_err
1826
                            .lock()
1827
                            .unwrap_or_else(PoisonError::into_inner)
1828
                            .is_some()
1829
                        {
1830
                            return false;
1831
                        }
1832
                        Python::with_gil(|py| {
1833
                            match nf_obj
1834
                                .call1(py, (node_id,))
1835
                                .and_then(|r| r.extract::<bool>(py))
1836
                            {
1837
                                Ok(v) => v,
1838
                                Err(e) => {
1839
                                    *nf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1840
                                        Some(e);
1841
                                    false
1842
                                }
1843
                            }
1844
                        })
1845
                    }),
1846
                    Some(
1847
                        move |from: u32,
1848
                              to: u32,
1849
                              rel_type: RelationshipType,
1850
                              csr_pos: u32,
1851
                              snapshot: &CoreGraphSnapshot|
1852
                              -> bool {
1853
                            if rf_err
1854
                                .lock()
1855
                                .unwrap_or_else(PoisonError::into_inner)
1856
                                .is_some()
1857
                            {
1858
                                return false;
1859
                            }
1860
                            Python::with_gil(|py| {
1861
                                let rel_type_str = snapshot
1862
                                    .resolve_string(rel_type.id())
1863
                                    .unwrap_or("")
1864
                                    .to_string();
1865
                                match rf_obj
1866
                                    .call1(py, (from, to, rel_type_str, csr_pos))
1867
                                    .and_then(|r| r.extract::<bool>(py))
1868
                                {
1869
                                    Ok(v) => v,
1870
                                    Err(e) => {
1871
                                        *rf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1872
                                            Some(e);
1873
                                        false
1874
                                    }
1875
                                }
1876
                            })
1877
                        },
1878
                    ),
1879
                    max_depth,
1880
                )
1881
            } else if let Some(nf_obj) = node_filter.as_ref() {
1882
                let nf_err = error_cell.clone();
1883
                type RelFilter = fn(u32, u32, RelationshipType, u32, &CoreGraphSnapshot) -> bool;
1884
                self.snapshot.bfs::<_, RelFilter>(
1885
                    &start_set,
1886
                    rust_direction,
1887
                    rel_types_str.as_deref(),
1888
                    Some(move |node_id: u32, _snapshot: &CoreGraphSnapshot| -> bool {
1889
                        if nf_err
1890
                            .lock()
1891
                            .unwrap_or_else(PoisonError::into_inner)
1892
                            .is_some()
1893
                        {
1894
                            return false;
1895
                        }
1896
                        Python::with_gil(|py| {
1897
                            match nf_obj
1898
                                .call1(py, (node_id,))
1899
                                .and_then(|r| r.extract::<bool>(py))
1900
                            {
1901
                                Ok(v) => v,
1902
                                Err(e) => {
1903
                                    *nf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1904
                                        Some(e);
1905
                                    false
1906
                                }
1907
                            }
1908
                        })
1909
                    }),
1910
                    None,
1911
                    max_depth,
1912
                )
1913
            } else if let Some(rf_obj) = rel_filter.as_ref() {
1914
                let rf_err = error_cell.clone();
1915
                type NodeFilter = fn(u32, &CoreGraphSnapshot) -> bool;
1916
                self.snapshot.bfs::<NodeFilter, _>(
1917
                    &start_set,
1918
                    rust_direction,
1919
                    rel_types_str.as_deref(),
1920
                    None,
1921
                    Some(
1922
                        move |from: u32,
1923
                              to: u32,
1924
                              rel_type: RelationshipType,
1925
                              csr_pos: u32,
1926
                              snapshot: &CoreGraphSnapshot|
1927
                              -> bool {
1928
                            if rf_err
1929
                                .lock()
1930
                                .unwrap_or_else(PoisonError::into_inner)
1931
                                .is_some()
1932
                            {
1933
                                return false;
1934
                            }
1935
                            Python::with_gil(|py| {
1936
                                let rel_type_str = snapshot
1937
                                    .resolve_string(rel_type.id())
1938
                                    .unwrap_or("")
1939
                                    .to_string();
1940
                                match rf_obj
1941
                                    .call1(py, (from, to, rel_type_str, csr_pos))
1942
                                    .and_then(|r| r.extract::<bool>(py))
1943
                                {
1944
                                    Ok(v) => v,
1945
                                    Err(e) => {
1946
                                        *rf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1947
                                            Some(e);
1948
                                        false
1949
                                    }
1950
                                }
1951
                            })
1952
                        },
1953
                    ),
1954
                    max_depth,
1955
                )
1956
            } else {
1957
                type NodeFilter = fn(u32, &CoreGraphSnapshot) -> bool;
1958
                type RelFilter = fn(u32, u32, RelationshipType, u32, &CoreGraphSnapshot) -> bool;
1959
                self.snapshot.bfs::<NodeFilter, RelFilter>(
1960
                    &start_set,
1961
                    rust_direction,
1962
                    rel_types_str.as_deref(),
1963
                    None,
1964
                    None,
1965
                    max_depth,
1966
                )
1967
            }
1968
        });
1969

1970
        // Propagate any Python exception captured during BFS
1971
        if let Some(err) = error_cell
1972
            .lock()
1973
            .unwrap_or_else(PoisonError::into_inner)
1974
            .take()
1975
        {
1976
            return Err(err);
1977
        }
1978

1979
        Ok((node_bitmap.iter().collect(), rel_bitmap.iter().collect()))
1980
    }
1981

1982
    /// Shortest hop-distance from `start` to every node reachable along `rel_types`
1983
    /// in `direction`, bounded to `max_depth` hops. Returns `{node_id: distance}`
1984
    /// (start is distance 0); `rel_types=None` follows every type. The typed
1985
    /// bounded BFS behind hop-distance filters (e.g. "friends 3..4 hops away").
1986
    #[pyo3(signature = (start, direction, *, rel_types=None, max_depth=None))]
1987
    fn bfs_distances(
1988
        &self,
1989
        py: Python<'_>,
1990
        start: u32,
1991
        direction: Direction,
1992
        rel_types: Option<Vec<String>>,
1993
        max_depth: Option<u32>,
1994
    ) -> std::collections::HashMap<u32, u32> {
1995
        let snapshot = self.snapshot.clone();
1996
        let dir: rustychickpeas_core::types::Direction = direction.into();
1997
        py.allow_threads(move || {
1998
            let types: Vec<&str> = rel_types
1999
                .as_ref()
2000
                .map(|t| t.iter().map(|s| s.as_str()).collect())
2001
                .unwrap_or_default();
2002
            snapshot
2003
                .bfs_distances(start, dir, types.as_slice(), max_depth)
2004
                .into_iter()
2005
                .collect()
2006
        })
2007
    }
2008

2009
    /// Check if a path exists between two nodes
2010
    #[pyo3(signature = (from_node, to_node, direction, *, rel_types=None, max_depth=None))]
2011
    fn can_reach(
2012
        &self,
2013
        from_node: u32,
2014
        to_node: u32,
2015
        direction: Direction,
2016
        rel_types: Option<Vec<String>>,
2017
        max_depth: Option<usize>,
2018
    ) -> PyResult<bool> {
2019
        if from_node >= self.snapshot.n_nodes {
2020
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
2021
                "from_node {} out of range (max: {})",
2022
                from_node,
2023
                self.snapshot.n_nodes.saturating_sub(1)
2024
            )));
2025
        }
2026
        if to_node >= self.snapshot.n_nodes {
2027
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
2028
                "to_node {} out of range (max: {})",
2029
                to_node,
2030
                self.snapshot.n_nodes.saturating_sub(1)
2031
            )));
2032
        }
2033

2034
        use rustychickpeas_core::types::Direction as CoreDirection;
2035
        let rust_direction = match direction {
2036
            Direction::Outgoing => CoreDirection::Outgoing,
2037
            Direction::Incoming => CoreDirection::Incoming,
2038
            Direction::Both => CoreDirection::Both,
2039
        };
2040

2041
        let rel_types_str: Option<Vec<&str>> = rel_types
2042
            .as_ref()
2043
            .map(|types| types.iter().map(|s| s.as_str()).collect());
2044

2045
        // can_reach takes Option<u32> for max_depth
2046
        let max_depth_u32 = max_depth.map(|d| d as u32);
2047

2048
        Ok(self.snapshot.can_reach(
2049
            from_node,
2050
            to_node,
2051
            rust_direction,
2052
            rel_types_str.as_deref(),
2053
            max_depth_u32,
2054
        ))
2055
    }
2056
}
2057

2058
/// A dense property column exposed to Python as a self-describing, buffer-protocol
2059
/// array (its dtype is intrinsic — no `.i64()` narrowing). Built by
2060
/// [`GraphSnapshot::column`]; holds an `Arc` to the snapshot so the zero-copy
2061
/// buffer it hands out (numpy / pyarrow / memoryview) stays valid for its lifetime.
2062
#[pyclass(name = "Column")]
2063
pub struct Column {
2064
    snapshot: Arc<CoreGraphSnapshot>,
2065
    key: String,
2066
    dtype: ColumnDtype,
2067
    len: usize,
2068
    itemsize: isize,
2069
    // One-element shape/strides the buffer view points at (must outlive the view;
2070
    // they live in this object, which view.obj keeps alive).
2071
    shape: [ffi::Py_ssize_t; 1],
2072
    strides: [ffi::Py_ssize_t; 1],
2073
    // Booleans are bit-packed in core; expanded to one 0/1 byte per node so the
2074
    // buffer has a standard layout. `None` (zero-copy) for the other dtypes.
2075
    bool_bytes: Option<Vec<u8>>,
2076
}
2077

2078
impl Column {
2079
    /// Build a Column for a dense node column, or `None` if absent / not dense.
2080
    fn build(snapshot: Arc<CoreGraphSnapshot>, key: &str) -> Option<Column> {
2081
        let col = snapshot.col(key)?;
2082
        let dtype = col.dtype();
2083
        let (len, itemsize, bool_bytes) = match dtype {
2084
            ColumnDtype::I64 => (col.i64().as_slice()?.len(), 8isize, None),
2085
            ColumnDtype::F64 => (col.f64().as_slice()?.len(), 8, None),
2086
            ColumnDtype::Str => (col.str().as_ids()?.len(), 4, None),
2087
            ColumnDtype::Bool => {
2088
                let bytes: Vec<u8> = col.bool().as_slice()?.iter().map(|b| *b as u8).collect();
2089
                (bytes.len(), 1, Some(bytes))
2090
            }
2091
        };
2092
        Some(Column {
2093
            snapshot,
2094
            key: key.to_string(),
2095
            dtype,
2096
            len,
2097
            itemsize,
2098
            shape: [len as ffi::Py_ssize_t],
2099
            strides: [itemsize as ffi::Py_ssize_t],
2100
            bool_bytes,
2101
        })
2102
    }
2103

2104
    /// Raw data pointer + struct-format char for the buffer view. The pointer is
2105
    /// into the immutable snapshot (or `bool_bytes`) and is stable for the lifetime.
2106
    fn buffer_ptr_format(&self) -> (*const u8, &'static [u8]) {
2107
        match self.dtype {
2108
            ColumnDtype::I64 => {
2109
                let s = self
2110
                    .snapshot
2111
                    .col(&self.key)
2112
                    .unwrap()
2113
                    .i64()
2114
                    .as_slice()
2115
                    .unwrap();
2116
                (s.as_ptr() as *const u8, b"q\0")
2117
            }
2118
            ColumnDtype::F64 => {
2119
                let s = self
2120
                    .snapshot
2121
                    .col(&self.key)
2122
                    .unwrap()
2123
                    .f64()
2124
                    .as_slice()
2125
                    .unwrap();
2126
                (s.as_ptr() as *const u8, b"d\0")
2127
            }
2128
            ColumnDtype::Str => {
2129
                let s = self
2130
                    .snapshot
2131
                    .col(&self.key)
2132
                    .unwrap()
2133
                    .str()
2134
                    .as_ids()
2135
                    .unwrap();
2136
                (s.as_ptr() as *const u8, b"I\0")
2137
            }
2138
            ColumnDtype::Bool => (self.bool_bytes.as_ref().unwrap().as_ptr(), b"B\0"),
2139
        }
2140
    }
2141
}
2142

2143
#[pymethods]
2144
impl Column {
2145
    /// The numpy/struct dtype name: 'int64' | 'float64' | 'bool' | 'string'.
2146
    #[getter]
2147
    fn dtype(&self) -> &'static str {
2148
        match self.dtype {
2149
            ColumnDtype::I64 => "int64",
2150
            ColumnDtype::F64 => "float64",
2151
            ColumnDtype::Bool => "bool",
2152
            ColumnDtype::Str => "string",
2153
        }
2154
    }
2155

2156
    fn __len__(&self) -> usize {
2157
        self.len
2158
    }
2159

2160
    fn __repr__(&self) -> String {
2161
        format!(
2162
            "Column(key='{}', dtype='{}', len={})",
2163
            self.key,
2164
            self.dtype(),
2165
            self.len
2166
        )
2167
    }
2168

2169
    /// The column as a plain Python list. String columns resolve interned ids to
2170
    /// `str`; numeric/bool columns return `int` / `float` / `bool`.
2171
    fn to_pylist(&self, py: Python<'_>) -> PyResult<PyObject> {
2172
        match self.dtype {
2173
            ColumnDtype::I64 => self
2174
                .snapshot
2175
                .col(&self.key)
2176
                .unwrap()
2177
                .i64()
2178
                .as_slice()
2179
                .unwrap()
2180
                .to_vec()
2181
                .into_py_any(py),
2182
            ColumnDtype::F64 => self
2183
                .snapshot
2184
                .col(&self.key)
2185
                .unwrap()
2186
                .f64()
2187
                .as_slice()
2188
                .unwrap()
2189
                .to_vec()
2190
                .into_py_any(py),
2191
            ColumnDtype::Bool => {
2192
                let v: Vec<bool> = self
2193
                    .bool_bytes
2194
                    .as_ref()
2195
                    .unwrap()
2196
                    .iter()
2197
                    .map(|&b| b != 0)
2198
                    .collect();
2199
                v.into_py_any(py)
2200
            }
2201
            ColumnDtype::Str => {
2202
                let ids = self
2203
                    .snapshot
2204
                    .col(&self.key)
2205
                    .unwrap()
2206
                    .str()
2207
                    .as_ids()
2208
                    .unwrap();
2209
                let v: Vec<&str> = ids
2210
                    .iter()
2211
                    .map(|&id| self.snapshot.resolve_string(id).unwrap_or(""))
2212
                    .collect();
2213
                v.into_py_any(py)
2214
            }
2215
        }
2216
    }
2217

2218
    /// Buffer protocol: expose the dense bytes zero-copy (read-only, 1-D,
2219
    /// C-contiguous). `view.obj` takes a new reference to this Column so the backing
2220
    /// memory stays alive while the view is held.
2221
    unsafe fn __getbuffer__(
2222
        slf: PyRef<'_, Self>,
2223
        view: *mut ffi::Py_buffer,
2224
        flags: c_int,
2225
    ) -> PyResult<()> {
2226
        if view.is_null() {
2227
            return Err(pyo3::exceptions::PyBufferError::new_err("view is null"));
2228
        }
2229
        if (flags & ffi::PyBUF_WRITABLE) == ffi::PyBUF_WRITABLE {
2230
            return Err(pyo3::exceptions::PyBufferError::new_err(
2231
                "column buffer is read-only",
2232
            ));
2233
        }
2234
        let (ptr, format) = slf.buffer_ptr_format();
2235
        let obj = slf.as_ptr();
2236
        ffi::Py_INCREF(obj);
2237
        (*view).obj = obj;
2238
        (*view).buf = ptr as *mut c_void;
2239
        (*view).len = (slf.len as isize) * slf.itemsize;
2240
        (*view).readonly = 1;
2241
        (*view).itemsize = slf.itemsize;
2242
        (*view).ndim = 1;
2243
        (*view).format = if (flags & ffi::PyBUF_FORMAT) == ffi::PyBUF_FORMAT {
2244
            format.as_ptr() as *mut c_char
2245
        } else {
2246
            std::ptr::null_mut()
2247
        };
2248
        (*view).shape = if (flags & ffi::PyBUF_ND) == ffi::PyBUF_ND {
2249
            slf.shape.as_ptr() as *mut ffi::Py_ssize_t
2250
        } else {
2251
            std::ptr::null_mut()
2252
        };
2253
        (*view).strides = if (flags & ffi::PyBUF_STRIDES) == ffi::PyBUF_STRIDES {
2254
            slf.strides.as_ptr() as *mut ffi::Py_ssize_t
2255
        } else {
2256
            std::ptr::null_mut()
2257
        };
2258
        (*view).suboffsets = std::ptr::null_mut();
2259
        (*view).internal = std::ptr::null_mut();
2260
        Ok(())
2261
    }
2262

2263
    unsafe fn __releasebuffer__(&self, _view: *mut ffi::Py_buffer) {
2264
        // Nothing to free: format is static, shape/strides live in self, and
2265
        // CPython decrefs view.obj.
2266
    }
2267
}
2268

2269
/// A `node -> node id` array, indexed by node id: `arr[node]` or `memoryview(arr)`
2270
/// for a hot loop (zero-copy buffer, format 'I'/u32; `u32::MAX` = none). Returned by
2271
/// [`GraphSnapshot::roots_via`] (the chain terminal of a functional relation) and
2272
/// [`GraphSnapshot::neighbor_via`] (its one-hop neighbor).
2273
#[pyclass]
2274
pub struct NodeArray {
2275
    inner: Arc<[u32]>,
2276
    shape: [ffi::Py_ssize_t; 1],
2277
    strides: [ffi::Py_ssize_t; 1],
2278
}
2279

2280
#[pymethods]
2281
impl NodeArray {
2282
    fn __len__(&self) -> usize {
2283
        self.inner.len()
2284
    }
2285

2286
    fn __getitem__(&self, index: isize) -> PyResult<u32> {
2287
        let n = self.inner.len() as isize;
2288
        let i = if index < 0 { index + n } else { index };
2289
        if i < 0 || i >= n {
2290
            return Err(pyo3::exceptions::PyIndexError::new_err(
2291
                "node id out of range",
2292
            ));
2293
        }
2294
        Ok(self.inner[i as usize])
2295
    }
2296

2297
    fn __repr__(&self) -> String {
2298
        format!("NodeArray(len={})", self.inner.len())
2299
    }
2300

2301
    /// The whole array as a Python list of node ids.
2302
    fn to_pylist(&self, py: Python<'_>) -> PyResult<PyObject> {
2303
        self.inner.to_vec().into_py_any(py)
2304
    }
2305

2306
    /// Buffer protocol: expose the u32 array zero-copy (read-only, 1-D, format 'I').
2307
    unsafe fn __getbuffer__(
2308
        slf: PyRef<'_, Self>,
2309
        view: *mut ffi::Py_buffer,
2310
        flags: c_int,
2311
    ) -> PyResult<()> {
2312
        if view.is_null() {
2313
            return Err(pyo3::exceptions::PyBufferError::new_err("view is null"));
2314
        }
2315
        if (flags & ffi::PyBUF_WRITABLE) == ffi::PyBUF_WRITABLE {
2316
            return Err(pyo3::exceptions::PyBufferError::new_err(
2317
                "roots buffer is read-only",
2318
            ));
2319
        }
2320
        let obj = slf.as_ptr();
2321
        ffi::Py_INCREF(obj);
2322
        (*view).obj = obj;
2323
        (*view).buf = slf.inner.as_ptr() as *mut c_void;
2324
        (*view).len = (slf.inner.len() as isize) * 4;
2325
        (*view).readonly = 1;
2326
        (*view).itemsize = 4;
2327
        (*view).ndim = 1;
2328
        (*view).format = if (flags & ffi::PyBUF_FORMAT) == ffi::PyBUF_FORMAT {
2329
            c"I".as_ptr() as *mut c_char
2330
        } else {
2331
            std::ptr::null_mut()
2332
        };
2333
        (*view).shape = if (flags & ffi::PyBUF_ND) == ffi::PyBUF_ND {
2334
            slf.shape.as_ptr() as *mut ffi::Py_ssize_t
2335
        } else {
2336
            std::ptr::null_mut()
2337
        };
2338
        (*view).strides = if (flags & ffi::PyBUF_STRIDES) == ffi::PyBUF_STRIDES {
2339
            slf.strides.as_ptr() as *mut ffi::Py_ssize_t
2340
        } else {
2341
            std::ptr::null_mut()
2342
        };
2343
        (*view).suboffsets = std::ptr::null_mut();
2344
        (*view).internal = std::ptr::null_mut();
2345
        Ok(())
2346
    }
2347

2348
    unsafe fn __releasebuffer__(&self, _view: *mut ffi::Py_buffer) {}
2349
}
2350

2351
/// An immutable `(node, node) -> count` map keyed by the *unordered* pair — the
2352
/// resident result of [`GraphSnapshot::fold_via`] (a one-mode projection). Kept native
2353
/// so it can drive a weighted [`GraphSnapshot::dijkstra`] without a per-rel Python
2354
/// callback; dict-like for inspection (`pw[(a, b)]`, `(a, b) in pw`, `len(pw)`,
2355
/// `pw.to_dict()`). Lookups normalize the key to `(min, max)`.
2356
#[pyclass]
2357
pub struct PairWeights {
2358
    inner: Arc<std::collections::HashMap<(u32, u32), u64>>,
2359
}
2360

2361
#[pymethods]
2362
impl PairWeights {
2363
    fn __len__(&self) -> usize {
2364
        self.inner.len()
2365
    }
2366

2367
    fn __contains__(&self, key: (u32, u32)) -> bool {
2368
        let (a, b) = key;
2369
        let k = if a < b { (a, b) } else { (b, a) };
2370
        self.inner.contains_key(&k)
2371
    }
2372

2373
    fn __getitem__(&self, key: (u32, u32)) -> PyResult<u64> {
2374
        let (a, b) = key;
2375
        let k = if a < b { (a, b) } else { (b, a) };
2376
        self.inner
2377
            .get(&k)
2378
            .copied()
2379
            .ok_or_else(|| pyo3::exceptions::PyKeyError::new_err(format!("{:?}", key)))
2380
    }
2381

2382
    /// The count for the unordered pair `(a, b)`, or `default` (`None`) when absent.
2383
    #[pyo3(signature = (a, b, default=None))]
2384
    fn get(&self, a: u32, b: u32, default: Option<u64>) -> Option<u64> {
2385
        let k = if a < b { (a, b) } else { (b, a) };
2386
        self.inner.get(&k).copied().or(default)
2387
    }
2388

2389
    fn __repr__(&self) -> String {
2390
        format!("PairWeights(pairs={})", self.inner.len())
2391
    }
2392

2393
    /// Materialize as a Python dict `{(a, b): count}` (keys are `(min, max)`).
2394
    fn to_dict(&self, py: Python<'_>) -> PyResult<PyObject> {
2395
        let d = pyo3::types::PyDict::new(py);
2396
        for (&(a, b), &c) in self.inner.iter() {
2397
            d.set_item((a, b), c)?;
2398
        }
2399
        d.into_py_any(py)
2400
    }
2401
}
2402

2403
/// Backing for a [`RelView`] column: a typed, `Arc`-shared array.
2404
enum RelArrayData {
2405
    U32(Arc<[u32]>),
2406
    I64(Arc<[i64]>),
2407
    F64(Arc<[f64]>),
2408
}
2409

2410
impl RelArrayData {
2411
    fn len(&self) -> usize {
2412
        match self {
2413
            RelArrayData::U32(a) => a.len(),
2414
            RelArrayData::I64(a) => a.len(),
2415
            RelArrayData::F64(a) => a.len(),
2416
        }
2417
    }
2418
    fn clone_data(&self) -> RelArrayData {
2419
        match self {
2420
            RelArrayData::U32(a) => RelArrayData::U32(a.clone()),
2421
            RelArrayData::I64(a) => RelArrayData::I64(a.clone()),
2422
            RelArrayData::F64(a) => RelArrayData::F64(a.clone()),
2423
        }
2424
    }
2425
    fn ptr(&self) -> *mut c_void {
2426
        match self {
2427
            RelArrayData::U32(a) => a.as_ptr() as *mut c_void,
2428
            RelArrayData::I64(a) => a.as_ptr() as *mut c_void,
2429
            RelArrayData::F64(a) => a.as_ptr() as *mut c_void,
2430
        }
2431
    }
2432
    fn itemsize(&self) -> ffi::Py_ssize_t {
2433
        match self {
2434
            RelArrayData::U32(_) => 4,
2435
            _ => 8,
2436
        }
2437
    }
2438
    fn format(&self) -> *mut c_char {
2439
        let s: &[u8] = match self {
2440
            RelArrayData::U32(_) => b"I\0",
2441
            RelArrayData::I64(_) => b"q\0",
2442
            RelArrayData::F64(_) => b"d\0",
2443
        };
2444
        s.as_ptr() as *mut c_char
2445
    }
2446
}
2447

2448
/// One column of a [`RelView`] as a read-only, 1-D, buffer-protocol array — zero-copy
2449
/// `memoryview(...)` (format `'I'`=u32, `'q'`=i64, `'d'`=f64).
2450
#[pyclass]
2451
pub struct RelArray {
2452
    data: RelArrayData,
2453
    shape: [ffi::Py_ssize_t; 1],
2454
    strides: [ffi::Py_ssize_t; 1],
2455
}
2456

2457
impl RelArray {
2458
    fn new(data: RelArrayData) -> Self {
2459
        let shape = [data.len() as ffi::Py_ssize_t];
2460
        let strides = [data.itemsize()];
2461
        RelArray {
2462
            data,
2463
            shape,
2464
            strides,
2465
        }
2466
    }
2467
}
2468

2469
#[pymethods]
2470
impl RelArray {
2471
    fn __len__(&self) -> usize {
2472
        self.data.len()
2473
    }
2474

2475
    unsafe fn __getbuffer__(
2476
        slf: PyRef<'_, Self>,
2477
        view: *mut ffi::Py_buffer,
2478
        flags: c_int,
2479
    ) -> PyResult<()> {
2480
        if view.is_null() {
2481
            return Err(pyo3::exceptions::PyBufferError::new_err("view is null"));
2482
        }
2483
        if (flags & ffi::PyBUF_WRITABLE) == ffi::PyBUF_WRITABLE {
2484
            return Err(pyo3::exceptions::PyBufferError::new_err(
2485
                "RelArray is read-only",
2486
            ));
2487
        }
2488
        let obj = slf.as_ptr();
2489
        ffi::Py_INCREF(obj);
2490
        (*view).obj = obj;
2491
        (*view).buf = slf.data.ptr();
2492
        (*view).len = (slf.data.len() as ffi::Py_ssize_t) * slf.data.itemsize();
2493
        (*view).readonly = 1;
2494
        (*view).itemsize = slf.data.itemsize();
2495
        (*view).ndim = 1;
2496
        (*view).format = if (flags & ffi::PyBUF_FORMAT) == ffi::PyBUF_FORMAT {
2497
            slf.data.format()
2498
        } else {
2499
            std::ptr::null_mut()
2500
        };
2501
        (*view).shape = if (flags & ffi::PyBUF_ND) == ffi::PyBUF_ND {
2502
            slf.shape.as_ptr() as *mut ffi::Py_ssize_t
2503
        } else {
2504
            std::ptr::null_mut()
2505
        };
2506
        (*view).strides = if (flags & ffi::PyBUF_STRIDES) == ffi::PyBUF_STRIDES {
2507
            slf.strides.as_ptr() as *mut ffi::Py_ssize_t
2508
        } else {
2509
            std::ptr::null_mut()
2510
        };
2511
        (*view).suboffsets = std::ptr::null_mut();
2512
        (*view).internal = std::ptr::null_mut();
2513
        Ok(())
2514
    }
2515

2516
    unsafe fn __releasebuffer__(&self, _view: *mut ffi::Py_buffer) {}
2517
}
2518

2519
/// A bulk view of one node's rels (from [`GraphSnapshot::rel_view`]): `.neighbors`
2520
/// (u32) and `.col(key)` (i64/f64) as aligned, zero-copy [`RelArray`] buffers.
2521
#[pyclass]
2522
pub struct RelView {
2523
    neighbors: Arc<[u32]>,
2524
    cols: Vec<(String, RelArrayData)>,
2525
}
2526

2527
#[pymethods]
2528
impl RelView {
2529
    fn __len__(&self) -> usize {
2530
        self.neighbors.len()
2531
    }
2532

2533
    #[getter]
2534
    fn neighbors(&self) -> RelArray {
2535
        RelArray::new(RelArrayData::U32(self.neighbors.clone()))
2536
    }
2537

2538
    /// The aligned values for property `key` as a [`RelArray`], or `None` if `key`
2539
    /// was not requested.
2540
    fn col(&self, key: &str) -> Option<RelArray> {
2541
        self.cols
2542
            .iter()
2543
            .find(|(k, _)| k == key)
2544
            .map(|(_, d)| RelArray::new(d.clone_data()))
2545
    }
2546

2547
    fn __repr__(&self) -> String {
2548
        format!(
2549
            "RelView(len={}, cols={})",
2550
            self.neighbors.len(),
2551
            self.cols.len()
2552
        )
2553
    }
2554
}
2555

2556
/// One group dimension for the Python-side aggregation spec: a raw `i64` column,
2557
/// or a column bucketed by ascending `bounds`.
2558
#[derive(Clone)]
2559
enum GroupSpec {
2560
    Col(String),
2561
    Bin(String, Vec<i64>),
2562
}
2563

2564
/// Build the core [`rustychickpeas_core::Aggregation`] from a Python-side spec.
2565
/// All the scan/parallelism lives in core; this just translates the spec.
2566
#[allow(clippy::too_many_arguments)]
2567
fn build_core_agg<'a>(
2568
    snapshot: &'a CoreGraphSnapshot,
2569
    labels: &[String],
2570
    where_filters: &[(String, String, i64)],
2571
    having_filters: &[(String, String, i64)],
2572
    by_label: bool,
2573
    group: &[GroupSpec],
2574
    sum_col: Option<&str>,
2575
    through: Option<(&str, rustychickpeas_core::types::Direction)>,
2576
    neighbor_filter: Option<&[u32]>,
2577
    projected_filters: &[(Vec<u32>, String, Vec<ValueId>)],
2578
) -> PyResult<rustychickpeas_core::Aggregation<'a>> {
2579
    let op = |s: &str| {
×
2580
        AggOp::parse(s).map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))
×
2581
    };
2582
    let mut agg = snapshot.aggregate(labels.iter().cloned());
×
2583
    for (c, o, v) in where_filters {
×
2584
        agg = agg.filter(c.clone(), op(o)?, *v);
×
2585
    }
2586
    for (proj, col, allowed) in projected_filters {
×
2587
        agg = agg.filter_via(proj, col.clone(), allowed.iter().cloned());
×
2588
    }
2589
    for (c, o, v) in having_filters {
×
2590
        agg = agg.having(c.clone(), op(o)?, *v);
×
2591
    }
2592
    if by_label {
×
2593
        agg = agg.by_label();
×
2594
    }
2595
    for gs in group {
×
2596
        agg = match gs {
×
2597
            GroupSpec::Col(c) => agg.by(c.clone()),
×
2598
            GroupSpec::Bin(c, e) => agg.bin(c.clone(), e.clone()),
×
2599
        };
2600
    }
2601
    if let Some(c) = sum_col {
×
2602
        agg = agg.sum(c);
×
2603
    }
2604
    if let Some((rt, dir)) = through {
×
2605
        agg = agg.through(rt, dir);
×
2606
    }
2607
    if let Some(ids) = neighbor_filter {
×
2608
        agg = agg.only_neighbors(ids.iter().copied());
×
2609
    }
2610
    Ok(agg)
×
2611
}
2612

2613
/// Lazy neighbor-grouping query (see `GraphSnapshot.neighbor_groups`). Immutable:
2614
/// `.project(...)` returns a new builder; `.sizes()` / `.top_by_size(...)` run it
2615
/// in parallel with the GIL released.
2616
#[pyclass]
2617
#[derive(Clone)]
2618
pub struct NeighborGroups {
2619
    snapshot: Arc<CoreGraphSnapshot>,
2620
    sources: Vec<u32>,
2621
    rel: String,
2622
    direction: rustychickpeas_core::types::Direction,
2623
    project: Vec<(rustychickpeas_core::types::Direction, String)>,
2624
}
2625

2626
#[pymethods]
2627
impl NeighborGroups {
2628
    /// Project each neighbor to its group node via a `follow`-style list of
2629
    /// `(direction, rel_type)` steps. Returns a new builder.
2630
    fn project(&self, steps: Vec<(Direction, String)>) -> NeighborGroups {
2631
        NeighborGroups {
2632
            snapshot: self.snapshot.clone(),
2633
            sources: self.sources.clone(),
2634
            rel: self.rel.clone(),
2635
            direction: self.direction,
2636
            project: steps.into_iter().map(|(d, r)| (d.into(), r)).collect(),
2637
        }
2638
    }
2639

2640
    /// Per source, the size of its largest cohort: `(source, size)`.
2641
    fn sizes(&self, py: Python<'_>) -> Vec<(u32, u32)> {
2642
        let snapshot = self.snapshot.clone();
2643
        let sources = self.sources.clone();
2644
        let rel = self.rel.clone();
2645
        let direction = self.direction;
2646
        let project = self.project.clone();
2647
        py.allow_threads(move || {
2648
            let steps: Vec<(rustychickpeas_core::types::Direction, &str)> =
2649
                project.iter().map(|(d, r)| (*d, r.as_str())).collect();
2650
            snapshot
2651
                .neighbor_groups(&sources, &rel, direction)
2652
                .project(&steps)
2653
                .sizes()
2654
        })
2655
    }
2656

2657
    /// The top `n` sources by largest cohort size: `(source, size)`, size
2658
    /// descending. Ties break by the `tie` node property (read as i64, ascending)
2659
    /// when given — so the order can match a query's output-id ordering — else by
2660
    /// source id ascending.
2661
    #[pyo3(signature = (n, tie=None))]
2662
    fn top_by_size(&self, py: Python<'_>, n: usize, tie: Option<String>) -> Vec<(u32, u32)> {
2663
        let snapshot = self.snapshot.clone();
2664
        let sources = self.sources.clone();
2665
        let rel = self.rel.clone();
2666
        let direction = self.direction;
2667
        let project = self.project.clone();
2668
        py.allow_threads(move || {
2669
            let steps: Vec<(rustychickpeas_core::types::Direction, &str)> =
2670
                project.iter().map(|(d, r)| (*d, r.as_str())).collect();
2671
            snapshot
2672
                .neighbor_groups(&sources, &rel, direction)
2673
                .project(&steps)
2674
                .top_by_size(n, tie.as_deref())
2675
        })
2676
    }
2677
}
2678

2679
/// Fluent aggregation builder (immutable: each step returns a new builder), created
2680
/// by [`GraphSnapshot::aggregate`]. `.run()` executes the scan and returns an
2681
/// [`AggResult`].
2682
#[pyclass(name = "Aggregation")]
2683
#[derive(Clone)]
2684
pub struct Aggregation {
2685
    snapshot: Arc<CoreGraphSnapshot>,
2686
    labels: Vec<String>,
2687
    where_filters: Vec<(String, String, i64)>,
2688
    having_filters: Vec<(String, String, i64)>,
2689
    by_label: bool,
2690
    group: Vec<GroupSpec>,
2691
    sum_col: Option<String>,
2692
    through: Option<(String, rustychickpeas_core::types::Direction)>,
2693
    neighbor_filter: Option<Vec<u32>>,
2694
    /// `(projection, column, allowed value ids)` projected-property population filters.
2695
    projected_filters: Vec<(Vec<u32>, String, Vec<ValueId>)>,
2696
}
2697

2698
#[pymethods]
2699
impl Aggregation {
2700
    fn __repr__(&self) -> String {
2701
        format!(
2702
            "Aggregation(labels={:?}, where={}, having={}, group_dims={}, sum={:?})",
2703
            self.labels,
2704
            self.where_filters.len(),
2705
            self.having_filters.len(),
2706
            self.by_label as usize + self.group.len(),
2707
            self.sum_col,
2708
        )
2709
    }
2710

2711
    /// Population predicate `column op value` (op ∈ `<,<=,>,>=,==,!=`); rows passing
2712
    /// all of these count toward `total`.
2713
    #[pyo3(name = "where")]
2714
    fn where_(&self, column: String, op: String, value: i64) -> Aggregation {
2715
        let mut a = self.clone();
2716
        a.where_filters.push((column, op, value));
2717
        a
2718
    }
2719

2720
    /// Population predicate on a *projected* node: keep a source whose projected
2721
    /// node (`projection[source]`, e.g. a `roots_via` array mapping a message to its
2722
    /// thread root) has `column` in `values`. Any value type (membership test, not
2723
    /// the i64 comparison of `where`); strings not interned in this snapshot can't
2724
    /// match and are dropped. Applied with the `where` filters.
2725
    fn where_via(
2726
        &self,
2727
        projection: &NodeArray,
2728
        column: String,
2729
        values: Vec<Bound<'_, PyAny>>,
2730
    ) -> PyResult<Aggregation> {
2731
        use rustychickpeas_core::PropertyValue;
2732
        let mut allowed: Vec<ValueId> = Vec::with_capacity(values.len());
2733
        for v in &values {
2734
            let vid = match py_to_property_value(v)? {
2735
                PropertyValue::String(s) => self.snapshot.atoms.get_id(&s).map(ValueId::Str),
2736
                PropertyValue::Integer(i) => Some(ValueId::I64(i)),
2737
                PropertyValue::Float(f) => Some(ValueId::from_f64(f)),
2738
                PropertyValue::Boolean(b) => Some(ValueId::Bool(b)),
2739
                PropertyValue::InternedString(_) => {
2740
                    return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
2741
                        "InternedString not supported here",
2742
                    ))
2743
                }
2744
            };
2745
            if let Some(vid) = vid {
2746
                allowed.push(vid);
2747
            }
2748
        }
2749
        let mut a = self.clone();
2750
        a.projected_filters
2751
            .push((projection.inner.to_vec(), column, allowed));
2752
        Ok(a)
2753
    }
2754

2755
    /// Extra predicate applied to grouped rows only (after the population filters).
2756
    fn having(&self, column: String, op: String, value: i64) -> Aggregation {
2757
        let mut a = self.clone();
2758
        a.having_filters.push((column, op, value));
2759
        a
2760
    }
2761

2762
    /// Group by the source node label (returned in rows as its name).
2763
    fn by_label(&self) -> Aggregation {
2764
        let mut a = self.clone();
2765
        a.by_label = true;
2766
        a
2767
    }
2768

2769
    /// Group by a dense `i64` column's value.
2770
    fn by(&self, column: String) -> Aggregation {
2771
        let mut a = self.clone();
2772
        a.group.push(GroupSpec::Col(column));
2773
        a
2774
    }
2775

2776
    /// Group by a column bucketed at ascending `bounds` (bucket = count of
2777
    /// `bounds <= value`); the row field is `"{column}_bin"`.
2778
    fn bin(&self, column: String, bounds: Vec<i64>) -> Aggregation {
2779
        let mut a = self.clone();
2780
        a.group.push(GroupSpec::Bin(column, bounds));
2781
        a
2782
    }
2783

2784
    /// Also sum this `i64` column per group (row field `"sum"`).
2785
    fn sum(&self, column: String) -> Aggregation {
2786
        let mut a = self.clone();
2787
        a.sum_col = Some(column);
2788
        a
2789
    }
2790

2791
    /// Count rels of `rel_type`/`direction` out of each source node instead of
2792
    /// counting nodes, grouping additionally by the neighbor id (row field
2793
    /// `"neighbor"`). `total` still counts source nodes.
2794
    fn through(&self, rel_type: String, direction: Direction) -> Aggregation {
2795
        let mut a = self.clone();
2796
        a.through = Some((rel_type, direction.into()));
2797
        a
2798
    }
2799

2800
    /// With `through`, count only neighbors whose node id is in `node_ids` (others
2801
    /// skipped), so `.rows` has just those neighbors.
2802
    fn only_neighbors(&self, node_ids: Vec<u32>) -> Aggregation {
2803
        let mut a = self.clone();
2804
        a.neighbor_filter = Some(node_ids);
2805
        a
2806
    }
2807

2808
    /// Execute: returns an [`AggResult`] with `.total` and self-describing dict
2809
    /// `.rows` (keys: the group fields, then `"count"` and `"sum"` if requested).
2810
    fn run(&self, py: Python<'_>) -> PyResult<AggResult> {
2811
        let agg = build_core_agg(
2812
            &self.snapshot,
2813
            &self.labels,
2814
            &self.where_filters,
2815
            &self.having_filters,
2816
            self.by_label,
2817
            &self.group,
2818
            self.sum_col.as_deref(),
2819
            self.through.as_ref().map(|(rt, dir)| (rt.as_str(), *dir)),
2820
            self.neighbor_filter.as_deref(),
2821
            &self.projected_filters,
2822
        )?;
2823
        // The parallel scan lives in core; release the GIL while it runs.
2824
        let res = py
2825
            .allow_threads(|| agg.run())
2826
            .map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?;
2827
        let has_sum = self.sum_col.is_some();
2828

2829
        let rows = pyo3::types::PyList::empty(py);
2830
        for row in &res.rows {
2831
            let d = pyo3::types::PyDict::new(py);
2832
            for (pos, (field, val)) in res.fields.iter().zip(row.key.iter()).enumerate() {
2833
                if self.by_label && pos == 0 {
2834
                    // The label key is its index into `labels`; emit the name.
2835
                    d.set_item(field, &self.labels[*val as usize])?;
2836
                } else {
2837
                    d.set_item(field, *val)?;
2838
                }
2839
            }
2840
            d.set_item("count", row.count)?;
2841
            if has_sum {
2842
                d.set_item("sum", row.sum)?;
2843
            }
2844
            rows.append(d)?;
2845
        }
2846
        Ok(AggResult {
2847
            total: res.total,
2848
            rows: rows.into_any().unbind(),
2849
        })
2850
    }
2851
}
2852

2853
/// Result of [`Aggregation::run`]: `total` (population count) and `rows`
2854
/// (a list of self-describing dicts).
2855
#[pyclass(name = "AggResult")]
2856
pub struct AggResult {
2857
    #[pyo3(get)]
2858
    total: u64,
2859
    #[pyo3(get)]
2860
    rows: PyObject,
2861
}
2862

2863
#[pymethods]
2864
impl AggResult {
2865
    fn __repr__(&self, py: Python<'_>) -> PyResult<String> {
2866
        let n = self.rows.bind(py).len()?;
2867
        Ok(format!(
2868
            "AggResult(total={}, rows={} groups)",
2869
            self.total, n
2870
        ))
2871
    }
2872
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc