• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freeeve / rustychickpeas / 28033891415

23 Jun 2026 02:33PM UTC coverage: 89.901% (+0.02%) from 89.881%
28033891415

push

github

freeeve
fix(pipeline,reader): migrate with_version callers to builder idiom

Follow-on to eb1865a: with_version(v, cap, cap) -> new(cap, cap).with_version(v).

3347 of 3723 relevant lines covered (89.9%)

2.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/rustychickpeas-python/src/graph_snapshot.rs
1
//! GraphSnapshot Python wrapper
2

3
use crate::direction::Direction;
4
use crate::node::Node;
5
use crate::relationship::Relationship;
6
use crate::utils::{py_to_property_value, value_id_to_pyobject};
7
use pyo3::ffi;
8
use pyo3::prelude::*;
9
use pyo3::IntoPyObjectExt;
10
use roaring::RoaringBitmap;
11
use rustychickpeas_core::bitmap::NodeSet;
12
use rustychickpeas_core::types::PropertyKey;
13
use rustychickpeas_core::{
14
    AggOp, ColumnDtype, GraphSnapshot as CoreGraphSnapshot, Label, RelationshipRef,
15
    RelationshipType, ValueId,
16
};
17
use std::os::raw::{c_char, c_int, c_void};
18
use std::sync::{Arc, Mutex, PoisonError};
19

20
/// Python wrapper for GraphSnapshot
21
#[pyclass(name = "GraphSnapshot")]
22
pub struct GraphSnapshot {
23
    pub(crate) snapshot: std::sync::Arc<CoreGraphSnapshot>,
24
}
25

26
/// Iterator over the node IDs of a GraphSnapshot
27
///
28
/// Node IDs in a finalized snapshot are dense in `0..n_nodes` (the same range
29
/// accepted by `GraphSnapshot.node()`), so iteration only needs the bounds.
30
#[pyclass(name = "NodeIdIter")]
31
pub struct NodeIdIter {
32
    current: u32,
33
    end: u32,
34
}
35

36
#[pymethods]
37
impl NodeIdIter {
38
    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
39
        slf
40
    }
41

42
    fn __next__(mut slf: PyRefMut<'_, Self>) -> Option<u32> {
43
        if slf.current < slf.end {
44
            let id = slf.current;
45
            slf.current += 1;
46
            Some(id)
47
        } else {
48
            None
49
        }
50
    }
51
}
52

53
impl GraphSnapshot {
54
    /// Get string ID from string
55
    /// Uses the reverse index in Atoms for O(1) lookup
56
    fn get_string_id(&self, s: &str) -> Option<u32> {
57
        self.snapshot.atoms.get_id(s)
58
    }
59

60
    /// Get label from string
61
    fn label_from_str(&self, s: &str) -> Option<Label> {
62
        self.get_string_id(s).map(Label::new)
63
    }
64

65
    /// Get relationship type from string
66
    fn rel_type_from_str(&self, s: &str) -> Option<RelationshipType> {
67
        self.get_string_id(s).map(RelationshipType::new)
68
    }
69

70
    /// Get property key from string
71
    fn property_key_from_str(&self, s: &str) -> Option<PropertyKey> {
72
        self.get_string_id(s)
73
    }
74

75
    /// Convert a Python value to a core [`ValueId`], resolving a string to its
76
    /// interned atom id. Returns `Ok(None)` when a string value is not interned
77
    /// in this snapshot — no node can carry it, so predicate/lookup callers
78
    /// short-circuit to "no match" rather than erroring.
79
    fn py_value_to_id_opt(&self, value: &Bound<'_, PyAny>) -> PyResult<Option<ValueId>> {
80
        use rustychickpeas_core::PropertyValue;
81
        Ok(match py_to_property_value(value)? {
82
            PropertyValue::String(s) => self.get_string_id(&s).map(ValueId::Str),
83
            PropertyValue::Integer(i) => Some(ValueId::I64(i)),
84
            PropertyValue::Float(f) => Some(ValueId::from_f64(f)),
85
            PropertyValue::Boolean(b) => Some(ValueId::Bool(b)),
86
            PropertyValue::InternedString(_) => {
87
                return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
88
                    "InternedString not supported here",
89
                ));
90
            }
91
        })
92
    }
93
}
94

95
impl GraphSnapshot {
96
    /// Internal constructor (not exposed to Python)
97
    /// Takes ownership of a GraphSnapshot
98
    pub(crate) fn new(snapshot: CoreGraphSnapshot) -> Self {
99
        Self {
100
            snapshot: std::sync::Arc::new(snapshot),
101
        }
102
    }
103

104
    /// Internal constructor from Arc (for manager.get_graph_snapshot)
105
    pub(crate) fn from_arc(snapshot: std::sync::Arc<CoreGraphSnapshot>) -> Self {
106
        Self { snapshot }
107
    }
108
}
109

110
#[pymethods]
111
impl GraphSnapshot {
112
    fn __repr__(&self) -> String {
113
        let version = self
114
            .snapshot
115
            .version()
116
            .map(|v| v.to_string())
117
            .unwrap_or_else(|| "None".to_string());
118
        format!(
119
            "GraphSnapshot(nodes={}, rels={}, version={})",
120
            self.snapshot.n_nodes, self.snapshot.n_rels, version
121
        )
122
    }
123

124
    fn __len__(&self) -> usize {
125
        self.snapshot.n_nodes as usize
126
    }
127

128
    /// Iterate over all node IDs in the snapshot (0..n_nodes)
129
    ///
130
    /// Yields exactly the node IDs accepted by `node()`.
131
    fn __iter__(&self) -> NodeIdIter {
132
        NodeIdIter {
133
            current: 0,
134
            end: self.snapshot.n_nodes,
135
        }
136
    }
137

138
    /// Get number of nodes
139
    fn node_count(&self) -> u32 {
140
        self.snapshot.n_nodes
141
    }
142

143
    /// Get number of relationships
144
    fn relationship_count(&self) -> u64 {
145
        self.snapshot.n_rels
146
    }
147

148
    /// Get node labels
149
    fn node_labels(&self, node_id: u32) -> PyResult<Vec<String>> {
150
        // GraphSnapshot doesn't store labels per node directly
151
        // We need to iterate through label_index to find which labels contain this node
152
        let mut labels = Vec::new();
153
        for (label, node_set) in &self.snapshot.label_index {
154
            if node_set.contains(node_id) {
155
                if let Some(label_str) = self.snapshot.resolve_string(label.id()) {
156
                    labels.push(label_str.to_string());
157
                }
158
            }
159
        }
160
        Ok(labels)
161
    }
162

163
    /// Get nodes with a specific label
164
    fn nodes_with_label(&self, label: String) -> PyResult<Vec<u32>> {
165
        // Call Rust function - it returns None if label doesn't exist
166
        // We need to distinguish between "label doesn't exist" vs "label exists but no nodes"
167
        // Since nodes_with_label_id returns Option<&NodeSet>, None means label not in index
168
        let label_id = self.label_from_str(&label);
169
        if label_id.is_none() {
170
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
171
                "Label '{}' not found",
172
                label
173
            )));
174
        }
175

176
        // Label exists, now get nodes (will return Some even if empty)
177
        if let Some(node_set) = self.snapshot.nodes_with_label(&label) {
178
            Ok(node_set.iter().collect())
179
        } else {
180
            // This shouldn't happen if label exists, but handle it
181
            Ok(Vec::new())
182
        }
183
    }
184

185
    /// Get a Node object for the given node ID
186
    fn node(&self, node_id: u32) -> PyResult<Node> {
187
        if node_id >= self.snapshot.n_nodes {
188
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
189
                "Node ID {} out of range (max: {})",
190
                node_id,
191
                self.snapshot.n_nodes.saturating_sub(1)
192
            )));
193
        }
194
        Ok(Node {
195
            snapshot: self.snapshot.clone(),
196
            node_id,
197
        })
198
    }
199

200
    /// Get relationships (neighbors) of a node with optional type filtering
201
    ///
202
    /// # Arguments
203
    /// * `node_id` - The node ID
204
    /// * `direction` - Direction of relationships (Outgoing, Incoming, Both)
205
    /// * `rel_types` - Optional list of relationship types to filter by
206
    #[pyo3(signature = (node_id, direction=Direction::Outgoing, rel_types=None))]
207
    fn relationships(
208
        &self,
209
        node_id: u32,
210
        direction: Direction,
211
        rel_types: Option<Vec<String>>,
212
    ) -> PyResult<Vec<Relationship>> {
213
        use rustychickpeas_core::types::RelationshipType;
214

215
        if node_id >= self.snapshot.n_nodes {
216
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
217
                "Node ID {} out of range (max: {})",
218
                node_id,
219
                self.snapshot.n_nodes.saturating_sub(1)
220
            )));
221
        }
222

223
        // Convert string types to RelationshipType IDs via the public resolver.
224
        let rel_type_ids: Option<Vec<RelationshipType>> = rel_types.as_ref().and_then(|types| {
225
            let ids: Vec<RelationshipType> = types
226
                .iter()
227
                .filter_map(|s| self.snapshot.rel_type(s))
228
                .collect();
229
            if ids.is_empty() && !types.is_empty() {
230
                return None;
231
            }
232
            Some(ids)
233
        });
234

235
        let mut relationships = Vec::new();
236

237
        // Handle outgoing relationships
238
        if matches!(direction, Direction::Outgoing | Direction::Both) {
239
            let start = self.snapshot.out_offsets[node_id as usize] as usize;
240
            let end = self.snapshot.out_offsets[node_id as usize + 1] as usize;
241

242
            for (idx, (&_neighbor, &rel_type)) in self.snapshot.out_nbrs[start..end]
243
                .iter()
244
                .zip(self.snapshot.out_types[start..end].iter())
245
                .enumerate()
246
            {
247
                let rel_csr_index = start + idx;
248

249
                // Apply type filter if provided
250
                if let Some(ref type_ids) = rel_type_ids {
251
                    if !type_ids.contains(&rel_type) {
252
                        continue;
253
                    }
254
                } else if rel_types.as_ref().map(|t| !t.is_empty()).unwrap_or(false) {
255
                    // Filter was provided but no types found - skip
256
                    continue;
257
                }
258

259
                relationships.push(Relationship {
260
                    snapshot: self.snapshot.clone(),
261
                    rel_index: rel_csr_index as u32,
262
                    is_outgoing: true,
263
                });
264
            }
265
        }
266

267
        // Handle incoming relationships
268
        if matches!(direction, Direction::Incoming | Direction::Both) {
269
            let start = self.snapshot.in_offsets[node_id as usize] as usize;
270
            let end = self.snapshot.in_offsets[node_id as usize + 1] as usize;
271

272
            for (idx, (&_neighbor, &rel_type)) in self.snapshot.in_nbrs[start..end]
273
                .iter()
274
                .zip(self.snapshot.in_types[start..end].iter())
275
                .enumerate()
276
            {
277
                let rel_csr_index = start + idx;
278

279
                // Apply type filter if provided
280
                if let Some(ref type_ids) = rel_type_ids {
281
                    if !type_ids.contains(&rel_type) {
282
                        continue;
283
                    }
284
                } else if rel_types.as_ref().map(|t| !t.is_empty()).unwrap_or(false) {
285
                    // Filter was provided but no types found - skip
286
                    continue;
287
                }
288

289
                relationships.push(Relationship {
290
                    snapshot: self.snapshot.clone(),
291
                    rel_index: rel_csr_index as u32,
292
                    is_outgoing: false,
293
                });
294
            }
295
        }
296

297
        Ok(relationships)
298
    }
299

300
    /// Neighbor node IDs in `direction`, optionally restricted to `rel_types`
301
    /// (deduplicated, ascending, when types are given).
302
    #[pyo3(signature = (node_id, direction=Direction::Outgoing, rel_types=None))]
303
    fn neighbor_ids(
304
        &self,
305
        node_id: u32,
306
        direction: Direction,
307
        rel_types: Option<Vec<String>>,
308
    ) -> Vec<u32> {
309
        match rel_types {
310
            None => self.snapshot.neighbors(node_id, direction.into()).collect(),
311
            Some(types) => {
312
                let mut set = RoaringBitmap::new();
313
                for t in &types {
314
                    for n in self
315
                        .snapshot
316
                        .neighbors_by_type(node_id, direction.into(), t.as_str())
317
                    {
318
                        set.insert(n);
319
                    }
320
                }
321
                set.iter().collect()
322
            }
323
        }
324
    }
325

326
    /// Histogram of the neighbours reached from `sources` via `rel_type` rels in
327
    /// `direction`: for each source node, count how many of its `rel_type`
328
    /// neighbours land on each target. Returns a dict mapping target node id to
329
    /// count. The whole aggregation runs in Rust on a single call, so it is far
330
    /// faster than counting neighbours in a Python loop.
331
    fn neighbor_counts(
332
        &self,
333
        sources: Vec<u32>,
334
        direction: Direction,
335
        rel_type: &str,
336
    ) -> std::collections::HashMap<u32, usize> {
337
        // Core returns a hashbrown map; collect into a std map so PyO3 hands Python a dict.
338
        self.snapshot
339
            .neighbor_counts(sources, direction.into(), rel_type)
340
            .into_iter()
341
            .collect()
342
    }
343

344
    /// Get neighbors of a node as Node objects
345
    /// Returns a list of Node objects for neighbors in the specified direction
346
    #[pyo3(signature = (node_id, direction=Direction::Outgoing, rel_types=None))]
347
    fn neighbors(
348
        &self,
349
        node_id: u32,
350
        direction: Direction,
351
        rel_types: Option<Vec<String>>,
352
    ) -> PyResult<Vec<Node>> {
353
        // Get relationships (optionally type-filtered) and extract neighbor IDs.
354
        let rels = self.relationships(node_id, direction, rel_types)?;
355
        let mut neighbor_ids = Vec::new();
356

357
        for rel in rels {
358
            let neighbor_id = if rel.is_outgoing {
359
                // For outgoing relationships, the end node is in out_nbrs
360
                let idx = rel.rel_index as usize;
361
                if idx < self.snapshot.out_nbrs.len() {
362
                    self.snapshot.out_nbrs[idx]
363
                } else {
364
                    continue; // Skip invalid relationship
365
                }
366
            } else {
367
                // For incoming relationships, the start node (neighbor) is in in_nbrs
368
                let idx = rel.rel_index as usize;
369
                if idx < self.snapshot.in_nbrs.len() {
370
                    self.snapshot.in_nbrs[idx]
371
                } else {
372
                    continue; // Skip invalid relationship
373
                }
374
            };
375
            neighbor_ids.push(neighbor_id);
376
        }
377

378
        Ok(neighbor_ids
379
            .into_iter()
380
            .map(|id| Node {
381
                snapshot: self.snapshot.clone(),
382
                node_id: id,
383
            })
384
            .collect())
385
    }
386

387
    /// Degree of a node — O(1) from the CSR offsets when untyped; with
388
    /// `rel_type`, the count of neighbors reached via that type.
389
    #[pyo3(signature = (node_id, direction=Direction::Outgoing, rel_type=None))]
390
    fn degree(&self, node_id: u32, direction: Direction, rel_type: Option<&str>) -> usize {
391
        match rel_type {
392
            Some(rt) => self
393
                .snapshot
394
                .neighbors_by_type(node_id, direction.into(), rt)
395
                .count(),
396
            None => crate::utils::csr_degree(&self.snapshot, node_id, direction.into()),
397
        }
398
    }
399

400
    /// Whether `node_id` carries `label` — an O(1) label-index check, vs the
401
    /// `"X" in node_labels(n)` scan it replaces.
402
    fn has_label(&self, node_id: u32, label: &str) -> bool {
403
        self.snapshot.has_label(node_id, label)
404
    }
405

406
    /// Whether `node_id` has any neighbor via `rel_type` in `direction`
407
    /// (existence check, short-circuits on the first match).
408
    fn has_rel(&self, node_id: u32, direction: Direction, rel_type: &str) -> bool {
409
        self.snapshot.has_rel(node_id, direction.into(), rel_type)
410
    }
411

412
    /// Whether `node_id` has a neighbor (via `rel_type`, `direction`) whose node
413
    /// property `key` equals `value` — resolves the value once, then compares ids
414
    /// per neighbor (vs a per-neighbor Python property read).
415
    fn has_neighbor_with_property(
416
        &self,
417
        node_id: u32,
418
        direction: Direction,
419
        rel_type: &str,
420
        key: &str,
421
        value: &Bound<'_, PyAny>,
422
    ) -> PyResult<bool> {
423
        let Some(value_id) = self.py_value_to_id_opt(value)? else {
424
            return Ok(false);
425
        };
426
        Ok(self.snapshot.has_neighbor_with_property(
427
            node_id,
428
            direction.into(),
429
            rel_type,
430
            key,
431
            value_id,
432
        ))
433
    }
434

435
    /// The smallest node carrying `label` with property `key` == `value`, or
436
    /// `None` — collapses `nodes_with_property(..)[0]`, label-scoped so a `name`
437
    /// shared across labels stays unambiguous.
438
    fn node_with_label_property(
439
        &self,
440
        label: &str,
441
        key: &str,
442
        value: &Bound<'_, PyAny>,
443
    ) -> PyResult<Option<u32>> {
444
        let Some(value_id) = self.py_value_to_id_opt(value)? else {
445
            return Ok(None);
446
        };
447
        Ok(self.snapshot.node_with_label_property(label, key, value_id))
448
    }
449

450
    /// First neighbor of `node_id` via `rel_type` in `direction`, or `None` —
451
    /// returns the id (not a `Node`), short-circuiting the scan.
452
    fn first_neighbor(&self, node_id: u32, direction: Direction, rel_type: &str) -> Option<u32> {
453
        self.snapshot
454
            .first_neighbor(node_id, direction.into(), rel_type)
455
    }
456

457
    /// Follow a fixed chain of `(direction, rel_type)` steps from `start`, taking
458
    /// the first neighbor at each; `None` if a step has no neighbor. Returns the
459
    /// final node id (not a list of `Node`s).
460
    fn follow(&self, start: u32, steps: Vec<(Direction, String)>) -> Option<u32> {
461
        let steps: Vec<(rustychickpeas_core::Direction, &str)> = steps
462
            .iter()
463
            .map(|(d, r)| ((*d).into(), r.as_str()))
464
            .collect();
465
        self.snapshot.follow(start, &steps)
466
    }
467

468
    /// The root each node reaches by following the *functional* `rel_type` in `direction`
469
    /// (each node has one successor — e.g. a message's `replyOf` thread root); a node
470
    /// already terminal maps to itself. Returns a `NodeArray` array indexed by node id —
471
    /// `roots[node]` or `memoryview(roots)` in a hot loop. The forest-root array is
472
    /// built once and cached on the snapshot, so this is the bulk form to reach for
473
    /// over a per-node `root_via`. `None` if `rel_type` is unknown.
474
    fn roots_via(&self, rel_type: &str, direction: Direction) -> Option<NodeArray> {
475
        let rt = self.snapshot.relationship_type_from_str(rel_type)?;
476
        let inner = self.snapshot.roots_via(rt, direction.into());
477
        let len = inner.len() as ffi::Py_ssize_t;
478
        Some(NodeArray {
479
            inner,
480
            shape: [len],
481
            strides: [4],
482
        })
483
    }
484

485
    /// The root of a single `node` via the functional `rel_type` in `direction` (see
486
    /// `roots_via`). Convenience for a one-off lookup; in a per-node loop prefer
487
    /// `roots_via` and index it. `None` if `rel_type` is unknown.
488
    fn root_via(&self, node: u32, rel_type: &str, direction: Direction) -> Option<u32> {
489
        let rt = self.snapshot.relationship_type_from_str(rel_type)?;
490
        Some(self.snapshot.root_via(node, rt, direction.into()))
491
    }
492

493
    /// The single neighbor each node reaches via the *functional* `rel_type` in `direction`
494
    /// (one hop — e.g. a message's `hasCreator` -> its creator). The depth-1 sibling of
495
    /// `roots_via`: where that follows the chain to its terminal, this takes one step.
496
    /// Returns a `NodeArray` indexed by node id; a node with no such neighbor maps to
497
    /// `u32::MAX` (4294967295). Built fresh each call (one `first_neighbor` per node,
498
    /// GIL released), so hold the result for a hot loop. `None` if `rel_type` is unknown.
499
    fn neighbor_via(
500
        &self,
501
        py: Python<'_>,
502
        rel_type: &str,
503
        direction: Direction,
504
    ) -> Option<NodeArray> {
505
        let rt = self.snapshot.relationship_type_from_str(rel_type)?;
506
        let dir: rustychickpeas_core::Direction = direction.into();
507
        let snapshot = self.snapshot.clone();
508
        let inner = py.allow_threads(move || snapshot.neighbor_via(rt, dir));
509
        let len = inner.len() as ffi::Py_ssize_t;
510
        Some(NodeArray {
511
            inner,
512
            shape: [len],
513
            strides: [4],
514
        })
515
    }
516

517
    /// Fold relationship `rel_type` (in `direction`) into a `PairWeights` map by projecting
518
    /// both endpoints of each rel through `projection` (a `NodeArray`, e.g. from
519
    /// `neighbor_via` or `roots_via`) — the one-mode / bipartite projection ("network
520
    /// folding") of a relation onto a derived node set. For each `rel_type` rel `a -> b`,
521
    /// the unordered pair `(min, max)` of `projection[a]` / `projection[b]` gets one
522
    /// count; self-pairs and endpoints mapping to the `u32::MAX` sentinel are skipped.
523
    /// Runs the parallel core kernel with the GIL released. The result stays resident
524
    /// (no per-pair Python object) so it can drive a native weighted `dijkstra` without
525
    /// a per-rel callback; `to_dict()` materializes it. E.g. BI Q19's person
526
    /// interaction graph: ``g.fold_via("replyOf", Direction.Outgoing,
527
    /// g.neighbor_via("hasCreator", Direction.Incoming))``.
528
    fn fold_via(
529
        &self,
530
        py: Python<'_>,
531
        rel_type: &str,
532
        direction: Direction,
533
        projection: &NodeArray,
534
    ) -> PairWeights {
535
        let snapshot = self.snapshot.clone();
536
        let dir: rustychickpeas_core::Direction = direction.into();
537
        let rel_type = rel_type.to_owned();
538
        let proj = projection.inner.clone();
539
        let map: std::collections::HashMap<(u32, u32), u64> = py.allow_threads(move || {
540
            snapshot
541
                .fold_via(rel_type.as_str(), dir, proj.as_ref())
542
                .into_iter()
543
                .collect()
544
        });
545
        PairWeights {
546
            inner: Arc::new(map),
547
        }
548
    }
549

550
    /// Single-source weighted shortest paths (Dijkstra) from `source` along `rel_type` in
551
    /// `direction`, with rel costs derived from a resident `weights` map (`PairWeights`,
552
    /// e.g. from `fold_via`). The cost of rel `(u, v)` is `1.0 / (weights[(u, v)] + base)`;
553
    /// a pair absent from `weights` is untraversable when `prune_missing` (else costs
554
    /// `1.0 / base`). Returns `{node_id: cost}` for every node reached (the source maps to
555
    /// `0.0`); pass `target` to stop once it is settled. The weight lookup runs inside the
556
    /// native kernel with the GIL released — no per-rel Python callback. E.g. BI Q19's
557
    /// interaction path: ``g.dijkstra(p1, Direction.Outgoing, "knows", weights=interaction,
558
    /// base=0.0, prune_missing=True)``.
559
    #[allow(clippy::too_many_arguments)]
560
    #[pyo3(signature = (source, direction, rel_type, *, weights, base=0.0, prune_missing=false, target=None))]
561
    fn dijkstra(
562
        &self,
563
        py: Python<'_>,
564
        source: u32,
565
        direction: Direction,
566
        rel_type: &str,
567
        weights: &PairWeights,
568
        base: f64,
569
        prune_missing: bool,
570
        target: Option<u32>,
571
    ) -> std::collections::HashMap<u32, f64> {
572
        let snapshot = self.snapshot.clone();
573
        let dir: rustychickpeas_core::Direction = direction.into();
574
        let rel_type = rel_type.to_owned();
575
        let map = weights.inner.clone();
576
        py.allow_threads(move || {
577
            let paths = snapshot.dijkstra(source, dir, rel_type.as_str(), target, |from, r| {
578
                let key = if from < r.neighbor {
579
                    (from, r.neighbor)
580
                } else {
581
                    (r.neighbor, from)
582
                };
583
                match map.get(&key) {
584
                    Some(&w) => 1.0 / (w as f64 + base),
585
                    None if prune_missing => f64::INFINITY,
586
                    None => 1.0 / base,
587
                }
588
            });
589
            paths.into_distances().into_iter().collect()
590
        })
591
    }
592

593
    /// Bulk-read a node's `rel_type` rels in `direction` as aligned arrays: the
594
    /// neighbor ids plus one Python list per requested property key (read from the
595
    /// rel column by CSR position, each column resolved once). The property-bearing
596
    /// bulk sibling of `neighbor_ids` — avoids the per-rel `Relationship` object and
597
    /// per-key `get_property` of `relationships()`. Returns `(neighbors, [values,
598
    /// ...])` aligned by index; a property absent on a rel is `None`.
599
    fn rels_with_props(
600
        &self,
601
        py: Python<'_>,
602
        node_id: u32,
603
        direction: Direction,
604
        rel_type: &str,
605
        prop_keys: Vec<String>,
606
    ) -> PyResult<PyObject> {
607
        use pyo3::types::PyList;
608
        use rustychickpeas_core::{BoolCol, ColumnDtype, Direction as CoreDir, F64Col, I64Col};
609
        enum H<'a> {
610
            I64(I64Col<'a>),
611
            F64(F64Col<'a>),
612
            Bool(BoolCol<'a>),
613
            None,
614
        }
615
        let snap = &self.snapshot;
616
        let dir: CoreDir = direction.into();
617
        let neighbors = PyList::empty(py);
618
        let cols: Vec<Bound<PyList>> = (0..prop_keys.len()).map(|_| PyList::empty(py)).collect();
619
        let rt = match snap.rel_type(rel_type) {
620
            Some(rt) if (node_id as usize) + 1 < snap.out_offsets.len() => rt,
621
            _ => return (neighbors, cols).into_py_any(py),
622
        };
623
        // Resolve each rel column to a typed reader once (not per rel).
624
        let hoisted: Vec<H> = prop_keys
625
            .iter()
626
            .map(|k| match snap.rel_col(k).map(|c| c.dtype()) {
627
                Some(ColumnDtype::I64) => H::I64(snap.rel_col(k).unwrap().i64()),
628
                Some(ColumnDtype::F64) => H::F64(snap.rel_col(k).unwrap().f64()),
629
                Some(ColumnDtype::Bool) => H::Bool(snap.rel_col(k).unwrap().bool()),
630
                _ => H::None,
631
            })
632
            .collect();
633
        let n = node_id as usize;
634
        let mut ranges: Vec<(usize, usize, bool)> = Vec::new();
635
        if matches!(dir, CoreDir::Outgoing | CoreDir::Both) {
636
            ranges.push((
637
                snap.out_offsets[n] as usize,
638
                snap.out_offsets[n + 1] as usize,
639
                true,
640
            ));
641
        }
642
        if matches!(dir, CoreDir::Incoming | CoreDir::Both) {
643
            ranges.push((
644
                snap.in_offsets[n] as usize,
645
                snap.in_offsets[n + 1] as usize,
646
                false,
647
            ));
648
        }
649
        for (s, e, outgoing) in ranges {
650
            for i in s..e {
651
                let (rtype, nbr, pos) = if outgoing {
652
                    (snap.out_types[i], snap.out_nbrs[i], i as u32)
653
                } else {
654
                    (
655
                        snap.in_types[i],
656
                        snap.in_nbrs[i],
657
                        snap.in_to_out.get(i).copied().unwrap_or(i as u32),
658
                    )
659
                };
660
                if rtype != rt {
661
                    continue;
662
                }
663
                neighbors.append(nbr)?;
664
                for (j, h) in hoisted.iter().enumerate() {
665
                    let v: PyObject = match h {
666
                        H::I64(c) => c.get(pos).into_py_any(py)?,
667
                        H::F64(c) => c.get(pos).into_py_any(py)?,
668
                        H::Bool(c) => c.get(pos).into_py_any(py)?,
669
                        H::None => py.None(),
670
                    };
671
                    cols[j].append(v)?;
672
                }
673
            }
674
        }
675
        (neighbors, cols).into_py_any(py)
676
    }
677

678
    /// Like `rels_with_props` but returns a `RelView` of zero-copy buffer arrays
679
    /// (`memoryview`-able, no per-value Python boxing): `.neighbors` (u32) and
680
    /// `.col(key)` (i64/f64). The native gather runs with the GIL released; a
681
    /// reduction like `sum(memoryview(v.col("amt")))` then runs at C speed. Missing
682
    /// props default to 0 (a typed buffer can't hold `None`).
683
    fn rel_view(
684
        &self,
685
        py: Python<'_>,
686
        node_id: u32,
687
        direction: Direction,
688
        rel_type: String,
689
        prop_keys: Vec<String>,
690
    ) -> RelView {
691
        use rustychickpeas_core::{ColumnDtype, Direction as CoreDir, F64Col, I64Col};
692
        enum KeyAcc<'a> {
693
            I64(I64Col<'a>, Vec<i64>),
694
            F64(F64Col<'a>, Vec<f64>),
695
            None(Vec<i64>),
696
        }
697
        let snapshot = self.snapshot.clone();
698
        let dir: CoreDir = direction.into();
699
        let n = node_id as usize;
700
        let (neighbors, cols): (Vec<u32>, Vec<(String, RelArrayData)>) =
701
            py.allow_threads(move || {
702
                let snap = &snapshot;
703
                let mut nbrs: Vec<u32> = Vec::new();
704
                let mut accs: Vec<KeyAcc> = prop_keys
705
                    .iter()
706
                    .map(|k| match snap.rel_col(k).map(|c| c.dtype()) {
707
                        Some(ColumnDtype::I64) => {
708
                            KeyAcc::I64(snap.rel_col(k).unwrap().i64(), Vec::new())
709
                        }
710
                        Some(ColumnDtype::F64) => {
711
                            KeyAcc::F64(snap.rel_col(k).unwrap().f64(), Vec::new())
712
                        }
713
                        _ => KeyAcc::None(Vec::new()),
714
                    })
715
                    .collect();
716
                let rt = snap.rel_type(&rel_type);
717
                if let (Some(rt), true) = (rt, n + 1 < snap.out_offsets.len()) {
718
                    let mut ranges: Vec<(usize, usize, bool)> = Vec::new();
719
                    if matches!(dir, CoreDir::Outgoing | CoreDir::Both) {
720
                        ranges.push((
721
                            snap.out_offsets[n] as usize,
722
                            snap.out_offsets[n + 1] as usize,
723
                            true,
724
                        ));
725
                    }
726
                    if matches!(dir, CoreDir::Incoming | CoreDir::Both) {
727
                        ranges.push((
728
                            snap.in_offsets[n] as usize,
729
                            snap.in_offsets[n + 1] as usize,
730
                            false,
731
                        ));
732
                    }
733
                    for (s, e, outgoing) in ranges {
734
                        for i in s..e {
735
                            let (rtype, nbr, pos) = if outgoing {
736
                                (snap.out_types[i], snap.out_nbrs[i], i as u32)
737
                            } else {
738
                                (
739
                                    snap.in_types[i],
740
                                    snap.in_nbrs[i],
741
                                    snap.in_to_out.get(i).copied().unwrap_or(i as u32),
742
                                )
743
                            };
744
                            if rtype != rt {
745
                                continue;
746
                            }
747
                            nbrs.push(nbr);
748
                            for acc in accs.iter_mut() {
749
                                match acc {
750
                                    KeyAcc::I64(c, v) => v.push(c.get(pos).unwrap_or(0)),
751
                                    KeyAcc::F64(c, v) => v.push(c.get(pos).unwrap_or(0.0)),
752
                                    KeyAcc::None(v) => v.push(0),
753
                                }
754
                            }
755
                        }
756
                    }
757
                }
758
                let cols = prop_keys
759
                    .into_iter()
760
                    .zip(accs)
761
                    .map(|(k, acc)| match acc {
762
                        KeyAcc::I64(_, v) => (k, RelArrayData::I64(v.into())),
763
                        KeyAcc::F64(_, v) => (k, RelArrayData::F64(v.into())),
764
                        KeyAcc::None(v) => (k, RelArrayData::I64(v.into())),
765
                    })
766
                    .collect();
767
                (nbrs, cols)
768
            });
769
        RelView {
770
            neighbors: neighbors.into(),
771
            cols,
772
        }
773
    }
774

775
    /// Build a `NeighborGroups` query over each source node's `rel_type` neighbors (in
776
    /// `direction`): group each source's neighbors by a projected attribute and
777
    /// reduce per source. Nothing runs until a terminal (`.sizes()` /
778
    /// `.top_by_size(...)`). E.g. BI Q4's biggest single-country membership per
779
    /// forum: ``g.neighbor_groups(forums, "hasMember", Direction.Outgoing)
780
    /// .project([(Direction.Outgoing, "isLocatedIn"), (Direction.Outgoing, "isPartOf")])
781
    /// .top_by_size(100, tie="flid")``.
782
    fn neighbor_groups(
783
        &self,
784
        sources: Vec<u32>,
785
        rel_type: String,
786
        direction: Direction,
787
    ) -> NeighborGroups {
788
        NeighborGroups {
789
            snapshot: self.snapshot.clone(),
790
            sources,
791
            rel_type,
792
            direction: direction.into(),
793
            project: Vec::new(),
794
        }
795
    }
796

797
    /// The string property `key` of `node_id`, or `None` when absent **or empty**
798
    /// (a dense string column stores a missing value as `""`).
799
    fn prop_str(&self, node_id: u32, key: &str) -> Option<String> {
800
        self.snapshot.prop_str(node_id, key).map(str::to_string)
801
    }
802

803
    /// All nodes within `min_hops..=max_hops` of `seed`, expanding only along
804
    /// `rel_type` in `direction` — the typed k-hop neighborhood as a list of ids
805
    /// (excludes `seed`). `min_hops` defaults to 1.
806
    #[pyo3(signature = (seed, direction, rel_type, max_hops, min_hops=1))]
807
    fn neighborhood(
808
        &self,
809
        seed: u32,
810
        direction: Direction,
811
        rel_type: &str,
812
        max_hops: u32,
813
        min_hops: u32,
814
    ) -> Vec<u32> {
815
        self.snapshot
816
            .neighborhood(seed, direction.into(), rel_type, min_hops..=max_hops)
817
            .iter()
818
            .collect()
819
    }
820

821
    /// The dense `i64` column `key` as a list (one value per node id), or `None`
822
    /// when the column is absent or not a dense `i64` column. Built on the
823
    /// zero-copy slice reader; the slice is copied to cross PyO3.
824
    fn i64_column(&self, key: &str) -> Option<Vec<i64>> {
825
        self.snapshot
826
            .col(key)
827
            .map(|c| c.i64())
828
            .and_then(|c| c.as_slice().map(<[i64]>::to_vec))
829
    }
830

831
    /// The interned id (code) for `s` in this snapshot, or `None` if `s` was never
832
    /// interned (so no node can carry it). Resolve filter targets to codes once,
833
    /// then compare them against a string [`Column`]'s codes (dtype `'string'`)
834
    /// over the buffer protocol — e.g. iterate `memoryview(g.column("lang"))`
835
    /// against the resolved code set. (numpy can consume the same buffer if you
836
    /// already use it, but is never required to filter.)
837
    fn string_id(&self, s: &str) -> Option<u32> {
838
        self.get_string_id(s)
839
    }
840

841
    /// A dense property column as a self-describing [`Column`] (its dtype is
842
    /// intrinsic — no `.i64()` narrowing), or `None` when the key is absent or the
843
    /// column is not stored densely. The `Column` supports the buffer protocol, so
844
    /// `memoryview(col)` reads it zero-copy and `col.to_pylist()` gives a plain
845
    /// Python list; any buffer consumer (e.g. `numpy.asarray(col)` /
846
    /// `pyarrow.py_buffer(col)`) works too, but is never required to filter.
847
    fn column(&self, key: &str) -> Option<Column> {
848
        Column::build(self.snapshot.clone(), key)
849
    }
850

851
    /// Low-level grouped reduction over dense `i64` node columns, run in Rust with
852
    /// the GIL released. Prefer the fluent [`GraphSnapshot::aggregate`] builder —
853
    /// this is the kernel it calls, kept public for direct use.
854
    ///
855
    /// Scans the nodes of each label in `labels`. A row counts toward `total` when
856
    /// it passes every `pre_filters` predicate `(column, op, value)` (op ∈
857
    /// `<,<=,>,>=,==,!=`); rows additionally passing every `group_filters` predicate
858
    /// are grouped. The group key is the label index (when `group_label`), then each
859
    /// `group_cols` value, then a bucket index per `group_bins` `(column, bounds)`
860
    /// (bucket = count of `bounds <= value`). Returns `(rows, total)` with
861
    /// `rows = [(key_tuple, count, sum), ...]`. All referenced columns must be dense
862
    /// `i64` columns; a missing label or non-dense/-i64 column raises `ValueError`.
863
    #[pyo3(signature = (labels, pre_filters=vec![], group_filters=vec![], group_label=false, group_cols=vec![], group_bins=vec![], sum_col=None))]
864
    #[allow(clippy::too_many_arguments, clippy::type_complexity)]
865
    fn group_reduce(
866
        &self,
867
        py: Python<'_>,
868
        labels: Vec<String>,
869
        pre_filters: Vec<(String, String, i64)>,
870
        group_filters: Vec<(String, String, i64)>,
871
        group_label: bool,
872
        group_cols: Vec<String>,
873
        group_bins: Vec<(String, Vec<i64>)>,
874
        sum_col: Option<String>,
875
    ) -> PyResult<(Vec<(Vec<i64>, u64, i64)>, u64)> {
876
        let mut group: Vec<GroupSpec> = group_cols.into_iter().map(GroupSpec::Col).collect();
877
        group.extend(group_bins.into_iter().map(|(c, e)| GroupSpec::Bin(c, e)));
878
        let agg = build_core_agg(
879
            &self.snapshot,
880
            &labels,
881
            &pre_filters,
882
            &group_filters,
883
            group_label,
884
            &group,
885
            sum_col.as_deref(),
886
            None,
887
            None,
888
            &[],
889
        )?;
890
        let res = py
891
            .allow_threads(|| agg.run())
892
            .map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?;
893
        let rows = res
894
            .rows
895
            .into_iter()
896
            .map(|r| (r.key, r.count, r.sum))
897
            .collect();
898
        Ok((rows, res.total))
899
    }
900

901
    /// Start a fluent aggregation over the given node labels — the Pythonic front
902
    /// for [`group_reduce`](Self::group_reduce). Chain `.where(col, op, value)` /
903
    /// `.having(col, op, value)` / `.by(col)` / `.bin(col, bounds)` / `.by_label()` /
904
    /// `.sum(col)`, then `.run()` for a result with `.total` and self-describing dict
905
    /// `.rows` (the source label comes back as its name). The heavy scan runs in Rust
906
    /// with the GIL released — no numpy/pyarrow needed.
907
    #[pyo3(signature = (*labels))]
908
    fn aggregate(&self, labels: Vec<String>) -> Aggregation {
909
        Aggregation {
910
            snapshot: self.snapshot.clone(),
911
            labels,
912
            where_filters: Vec::new(),
913
            having_filters: Vec::new(),
914
            by_label: false,
915
            group: Vec::new(),
916
            sum_col: None,
917
            through: None,
918
            neighbor_filter: None,
919
            projected_filters: Vec::new(),
920
        }
921
    }
922

923
    /// Weighted (or unweighted) shortest-path cost from `source` to `target`, or
924
    /// `None` if `target` is unreachable.
925
    ///
926
    /// With `weight_property`, each followed rel costs that f64/i64 rel
927
    /// property (an rel that lacks it is skipped); without it, every rel costs
928
    /// 1.0 (a hop count). `rel_types` optionally restricts which relationship
929
    /// types are followed (all types when `None`). Weights must be non-negative;
930
    /// the search is a bidirectional Dijkstra.
931
    #[pyo3(signature = (source, target, direction=Direction::Both, rel_types=None, weight_property=None))]
932
    fn shortest_path(
933
        &self,
934
        py: Python<'_>,
935
        source: u32,
936
        target: u32,
937
        direction: Direction,
938
        rel_types: Option<Vec<String>>,
939
        weight_property: Option<String>,
940
    ) -> PyResult<Option<f64>> {
941
        if source >= self.snapshot.n_nodes || target >= self.snapshot.n_nodes {
942
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
943
                "Node ID out of range (max: {})",
944
                self.snapshot.n_nodes.saturating_sub(1)
945
            )));
946
        }
947
        let types: Vec<&str> = rel_types
948
            .as_ref()
949
            .map(|t| t.iter().map(|s| s.as_str()).collect())
950
            .unwrap_or_default();
951
        let snapshot = self.snapshot.clone();
952
        // No Python is called during the search, so release the GIL.
953
        let cost = py.allow_threads(move || {
954
            let weight = |_from: u32, rel: &RelationshipRef| -> f64 {
955
                match &weight_property {
956
                    Some(prop) => match snapshot.rel_prop(rel.pos, prop).map(|p| p.value()) {
957
                        Some(ValueId::F64(bits)) => f64::from_bits(bits),
958
                        Some(ValueId::I64(w)) => w as f64,
959
                        _ => f64::INFINITY,
960
                    },
961
                    None => 1.0,
962
                }
963
            };
964
            snapshot.weighted_shortest_path(source, target, direction.into(), &types[..], weight)
965
        });
966
        Ok(cost)
967
    }
968

969
    /// Every relationship of type `rel_type`, as [`Relationship`] objects (each
970
    /// listed once, from its outgoing side). Raises `ValueError` if `rel_type` is
971
    /// not a known relationship type in this snapshot.
972
    fn relationships_with_type(&self, rel_type: String) -> PyResult<Vec<Relationship>> {
973
        let rel_type_id = self.rel_type_from_str(&rel_type).ok_or_else(|| {
974
            PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
975
                "Relationship type '{}' not found",
976
                rel_type
977
            ))
978
        })?;
979

980
        if let Some(bitmap) = self.snapshot.type_index.get(&rel_type_id) {
981
            let relationships: Vec<Relationship> = bitmap
982
                .iter()
983
                .map(|idx| Relationship {
984
                    snapshot: self.snapshot.clone(),
985
                    rel_index: idx,
986
                    is_outgoing: true,
987
                })
988
                .collect();
989
            Ok(relationships)
990
        } else {
991
            Ok(Vec::new())
992
        }
993
    }
994

995
    /// All node ids that carry any data — a label, a relationship, or a property —
996
    /// as a list. Ids that were never assigned data are omitted.
997
    fn all_nodes(&self) -> PyResult<Vec<u32>> {
998
        use std::collections::HashSet;
999
        let mut nodes = HashSet::new();
1000

1001
        // Add nodes with labels
1002
        for (_label, node_set) in &self.snapshot.label_index {
1003
            for node_id in node_set.iter() {
1004
                nodes.insert(node_id);
1005
            }
1006
        }
1007

1008
        // Add nodes with rels (check CSR arrays)
1009
        // Nodes with outgoing rels
1010
        for node_id in 0..self.snapshot.out_offsets.len().saturating_sub(1) {
1011
            let start = self.snapshot.out_offsets[node_id] as usize;
1012
            let end = self.snapshot.out_offsets[node_id + 1] as usize;
1013
            if start < end {
1014
                nodes.insert(node_id as u32);
1015
            }
1016
        }
1017

1018
        // Nodes with incoming rels
1019
        for node_id in 0..self.snapshot.in_offsets.len().saturating_sub(1) {
1020
            let start = self.snapshot.in_offsets[node_id] as usize;
1021
            let end = self.snapshot.in_offsets[node_id + 1] as usize;
1022
            if start < end {
1023
                nodes.insert(node_id as u32);
1024
            }
1025
        }
1026

1027
        // Add nodes with properties
1028
        for column in self.snapshot.columns.values() {
1029
            match column {
1030
                rustychickpeas_core::Column::DenseI64(_)
1031
                | rustychickpeas_core::Column::DenseF64(_)
1032
                | rustychickpeas_core::Column::DenseBool(_)
1033
                | rustychickpeas_core::Column::DenseStr(_) => {
1034
                    // Dense columns: all nodes from 0 to n_nodes-1 have this property
1035
                    // But we only want nodes that actually have data, so skip dense columns
1036
                    // (they're dense because most nodes have the property, but we can't tell which ones)
1037
                }
1038
                rustychickpeas_core::Column::SparseI64(pairs) => {
1039
                    for (node_id, _) in pairs {
1040
                        nodes.insert(*node_id);
1041
                    }
1042
                }
1043
                rustychickpeas_core::Column::SparseF64(pairs) => {
1044
                    for (node_id, _) in pairs {
1045
                        nodes.insert(*node_id);
1046
                    }
1047
                }
1048
                rustychickpeas_core::Column::SparseBool(pairs) => {
1049
                    for (node_id, _) in pairs {
1050
                        nodes.insert(*node_id);
1051
                    }
1052
                }
1053
                rustychickpeas_core::Column::SparseStr(pairs) => {
1054
                    for (node_id, _) in pairs {
1055
                        nodes.insert(*node_id);
1056
                    }
1057
                }
1058
                rustychickpeas_core::Column::RankI64 { present, .. }
1059
                | rustychickpeas_core::Column::RankF64 { present, .. }
1060
                | rustychickpeas_core::Column::RankBool { present, .. }
1061
                | rustychickpeas_core::Column::RankStr { present, .. } => {
1062
                    for pos in present.iter_ones() {
1063
                        nodes.insert(pos as u32);
1064
                    }
1065
                }
1066
            }
1067
        }
1068

1069
        let mut result: Vec<u32> = nodes.into_iter().collect();
1070
        result.sort_unstable();
1071
        Ok(result)
1072
    }
1073

1074
    /// Every relationship in the snapshot as [`Relationship`] objects, each listed
1075
    /// once (from its outgoing side).
1076
    fn all_relationships(&self) -> PyResult<Vec<Relationship>> {
1077
        let mut relationships = Vec::with_capacity(self.snapshot.out_nbrs.len());
1078

1079
        // Iterate through all outgoing relationships (each relationship appears once)
1080
        for idx in 0..self.snapshot.out_nbrs.len() {
1081
            relationships.push(Relationship {
1082
                snapshot: self.snapshot.clone(),
1083
                rel_index: idx as u32,
1084
                is_outgoing: true,
1085
            });
1086
        }
1087

1088
        Ok(relationships)
1089
    }
1090

1091
    /// Get a relationship by index
1092
    ///
1093
    /// Uses the outgoing relationship index (canonical index in out_nbrs).
1094
    /// Each relationship appears once in out_nbrs, so this is a unique identifier.
1095
    ///
1096
    /// # Arguments
1097
    /// * `rel_index` - The relationship index in out_nbrs (0 to n_rels-1)
1098
    fn relationship(&self, rel_index: u32) -> PyResult<Relationship> {
1099
        let max_index = self.snapshot.out_nbrs.len() as u32;
1100

1101
        if rel_index >= max_index {
1102
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1103
                "Relationship index {} out of range (max: {})",
1104
                rel_index,
1105
                max_index.saturating_sub(1)
1106
            )));
1107
        }
1108

1109
        Ok(Relationship {
1110
            snapshot: self.snapshot.clone(),
1111
            rel_index,
1112
            is_outgoing: true, // Always true since we use out_nbrs as canonical
1113
        })
1114
    }
1115

1116
    /// Get a relationship by node pair
1117
    ///
1118
    /// Finds a relationship between two nodes. If multiple relationships exist
1119
    /// between the same nodes, returns the first one found.
1120
    ///
1121
    /// # Arguments
1122
    /// * `start_node` - Source node ID
1123
    /// * `end_node` - Destination node ID
1124
    fn relationship_by_nodes(
1125
        &self,
1126
        start_node: u32,
1127
        end_node: u32,
1128
    ) -> PyResult<Option<Relationship>> {
1129
        // Check if start_node is valid
1130
        if start_node as usize >= self.snapshot.out_offsets.len().saturating_sub(1) {
1131
            return Ok(None);
1132
        }
1133

1134
        let start = self.snapshot.out_offsets[start_node as usize] as usize;
1135
        let end = self.snapshot.out_offsets[start_node as usize + 1] as usize;
1136

1137
        // Search for end_node in the outgoing neighbors of start_node
1138
        for (idx, &nbr) in self.snapshot.out_nbrs[start..end].iter().enumerate() {
1139
            if nbr == end_node {
1140
                return Ok(Some(Relationship {
1141
                    snapshot: self.snapshot.clone(),
1142
                    rel_index: (start + idx) as u32,
1143
                    is_outgoing: true,
1144
                }));
1145
            }
1146
        }
1147

1148
        Ok(None)
1149
    }
1150

1151
    /// The value of node property `key`, or `None` when the node has no such
1152
    /// property — including an unknown key (a lookup, like `dict.get`, not an
1153
    /// error). Out-of-range node ids likewise have no properties.
1154
    fn get_property(&self, node_id: u32, key: String) -> PyResult<Option<PyObject>> {
1155
        let value_id = self.snapshot.prop(node_id, &key).map(|p| p.value());
1156

1157
        Python::with_gil(|py| {
1158
            Ok(value_id.and_then(|vid| value_id_to_pyobject(py, vid, &self.snapshot.atoms)))
1159
        })
1160
    }
1161

1162
    /// Get nodes with a specific property value, scoped by label
1163
    ///
1164
    /// # Arguments
1165
    /// * `label` - The label to scope the query to
1166
    /// * `key` - The property key
1167
    /// * `value` - The property value to search for
1168
    #[pyo3(signature = (label, key, value))]
1169
    fn nodes_with_property(
1170
        &self,
1171
        label: String,
1172
        key: String,
1173
        value: &Bound<'_, PyAny>,
1174
    ) -> PyResult<Vec<u32>> {
1175
        // Check if label exists - need to do this before calling nodes_with_property()
1176
        // because it returns None for both "label doesn't exist" and "no matches"
1177
        let label_id = self.label_from_str(&label);
1178
        if label_id.is_none() {
1179
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1180
                "Label '{}' not found",
1181
                label
1182
            )));
1183
        }
1184

1185
        // Check if property key exists
1186
        let key_id = self.property_key_from_str(&key);
1187
        if key_id.is_none() {
1188
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1189
                "Property key '{}' not found",
1190
                key
1191
            )));
1192
        }
1193

1194
        // Both label and key exist, now convert value and query
1195
        let prop_value = py_to_property_value(value)?;
1196
        let value_id = match prop_value {
1197
            rustychickpeas_core::PropertyValue::String(s) => {
1198
                // Need to find string ID
1199
                let sid = self.get_string_id(&s).ok_or_else(|| {
1200
                    PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1201
                        "Property value string '{}' not found",
1202
                        s
1203
                    ))
1204
                })?;
1205
                ValueId::Str(sid)
1206
            }
1207
            rustychickpeas_core::PropertyValue::Integer(i) => ValueId::I64(i),
1208
            rustychickpeas_core::PropertyValue::Float(f) => ValueId::from_f64(f),
1209
            rustychickpeas_core::PropertyValue::Boolean(b) => ValueId::Bool(b),
1210
            rustychickpeas_core::PropertyValue::InternedString(_) => {
1211
                return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
1212
                    "InternedString not supported in GraphSnapshot queries",
1213
                ));
1214
            }
1215
        };
1216

1217
        // get_nodes_with_property now returns Option<NodeSet> (cloned) instead of Option<&NodeSet>
1218
        // None means no matches (valid), Some means matches found
1219
        if let Some(node_set) = self.snapshot.nodes_with_property(&label, &key, value_id) {
1220
            Ok(node_set.iter().collect())
1221
        } else {
1222
            Ok(Vec::new())
1223
        }
1224
    }
1225

1226
    /// Full-text search: node ids of `label` whose `key` string property contains
1227
    /// every whitespace/punctuation-delimited token in `query` (lowercased,
1228
    /// boolean AND), via the core lazily-built inverted index. Returns the node
1229
    /// ids ascending; empty for an unknown label/key, an empty query, or a token
1230
    /// no document contains. Wraps `GraphSnapshot::full_text_search`.
1231
    fn full_text_search(&self, label: &str, key: &str, query: &str) -> Vec<u32> {
1232
        self.snapshot
1233
            .full_text_search(label, key, query)
1234
            .iter()
1235
            .collect()
1236
    }
1237

1238
    /// Geo search: node ids of `label` whose `(lat_key, lon_key)` coordinates fall
1239
    /// within `km` great-circle kilometres of `(lat, lon)`, via the core geo k-d
1240
    /// tree. Returns the node ids ascending. Wraps
1241
    /// `GraphSnapshot::geo_within_radius`.
1242
    fn geo_within_radius(
1243
        &self,
1244
        label: &str,
1245
        lat_key: &str,
1246
        lon_key: &str,
1247
        lat: f64,
1248
        lon: f64,
1249
        km: f64,
1250
    ) -> Vec<u32> {
1251
        self.snapshot
1252
            .geo_within_radius(label, lat_key, lon_key, lat, lon, km)
1253
            .iter()
1254
            .collect()
1255
    }
1256

1257
    /// Geo search: node ids of `label` whose `(lat_key, lon_key)` coordinates fall
1258
    /// in the lat/lon rectangle with corners `(min_lat, min_lon)` and
1259
    /// `(max_lat, max_lon)` (a `min_lon > max_lon` box crosses the antimeridian),
1260
    /// via the core geo k-d tree. Returns the node ids ascending. Wraps
1261
    /// `GraphSnapshot::geo_within_bbox`.
1262
    #[allow(clippy::too_many_arguments)]
1263
    fn geo_within_bbox(
1264
        &self,
1265
        label: &str,
1266
        lat_key: &str,
1267
        lon_key: &str,
1268
        min_lat: f64,
1269
        min_lon: f64,
1270
        max_lat: f64,
1271
        max_lon: f64,
1272
    ) -> Vec<u32> {
1273
        self.snapshot
1274
            .geo_within_bbox(
1275
                label,
1276
                lat_key,
1277
                lon_key,
1278
                (min_lat, min_lon),
1279
                (max_lat, max_lon),
1280
            )
1281
            .iter()
1282
            .collect()
1283
    }
1284

1285
    /// PageRank after `iterations` synchronous pull updates with damping `damping`
1286
    /// (sinks redistribute their rank uniformly). `directed` picks the forward
1287
    /// direction (outgoing for a directed graph, both for undirected). Returns one
1288
    /// score per node id. Runs in Rust with the GIL released. Defaults
1289
    /// (`directed=True, damping=0.85, iterations=30`) make `g.pagerank()` the
1290
    /// common case. Wraps `GraphSnapshot::pagerank`.
1291
    #[pyo3(signature = (directed=true, damping=0.85, iterations=30))]
1292
    fn pagerank(&self, py: Python<'_>, directed: bool, damping: f64, iterations: u32) -> Vec<f64> {
1293
        let snapshot = self.snapshot.clone();
1294
        py.allow_threads(move || snapshot.pagerank(directed, damping, iterations))
1295
    }
1296

1297
    /// Weakly connected components: each node's label is the smallest node id in its
1298
    /// component (flood undirected rels). One label per node id. GIL released.
1299
    /// Wraps `GraphSnapshot::wcc`.
1300
    fn wcc(&self, py: Python<'_>) -> Vec<u32> {
1301
        let snapshot = self.snapshot.clone();
1302
        py.allow_threads(move || snapshot.wcc())
1303
    }
1304

1305
    /// Community detection by `iterations` rounds of synchronous label propagation
1306
    /// (most-frequent neighbour label, smallest on a tie; in+out counted separately
1307
    /// for a directed graph). `seed` gives explicit initial labels per node id
1308
    /// (default: node ids) — pass original vertex ids to match a vertex-id-keyed
1309
    /// reference. One label per node id. GIL released. Defaults
1310
    /// (`directed=True, iterations=10`) make `g.cdlp()` the common case.
1311
    /// Wraps `GraphSnapshot::cdlp` / `cdlp_seeded`.
1312
    #[pyo3(signature = (directed=true, iterations=10, seed=None))]
1313
    fn cdlp(
1314
        &self,
1315
        py: Python<'_>,
1316
        directed: bool,
1317
        iterations: u32,
1318
        seed: Option<Vec<u32>>,
1319
    ) -> Vec<u32> {
1320
        let snapshot = self.snapshot.clone();
1321
        py.allow_threads(move || match seed {
1322
            Some(s) => snapshot.cdlp_seeded(directed, iterations, &s),
1323
            None => snapshot.cdlp(directed, iterations),
1324
        })
1325
    }
1326

1327
    /// Local clustering coefficient per node: rels among each node's undirected
1328
    /// neighbour set over the maximum possible (0 when degree <= 1). `directed`
1329
    /// picks the forward direction for the rel count. One value per node id. GIL
1330
    /// released. `directed` defaults to `True`. Wraps `GraphSnapshot::lcc`.
1331
    #[pyo3(signature = (directed=true))]
1332
    fn lcc(&self, py: Python<'_>, directed: bool) -> Vec<f64> {
1333
        let snapshot = self.snapshot.clone();
1334
        py.allow_threads(move || snapshot.lcc(directed))
1335
    }
1336

1337
    /// Single-source shortest paths from `source` over forward rels with additive
1338
    /// weights from the `weight_key` rel property (`None` = unit weights);
1339
    /// unreachable nodes get `inf`. One distance per node id. GIL released.
1340
    /// `directed` defaults to `True`. Wraps `GraphSnapshot::sssp`.
1341
    #[pyo3(signature = (source, directed=true, weight_key=None))]
1342
    fn sssp(
1343
        &self,
1344
        py: Python<'_>,
1345
        source: u32,
1346
        directed: bool,
1347
        weight_key: Option<String>,
1348
    ) -> Vec<f64> {
1349
        let snapshot = self.snapshot.clone();
1350
        py.allow_threads(move || snapshot.sssp(source, directed, weight_key.as_deref()))
1351
    }
1352

1353
    /// Seeded co-occurrence — one-mode / bipartite projection by shared neighbour.
1354
    /// From `seed`, over relationship `rel_type`, the nodes that share a `rel_type`-neighbour
1355
    /// with `seed` (seed -> shared centers -> their other `rel_type`-neighbours), `seed`
1356
    /// excluded. `weight="count"` (default) weighs each co-occurring node by its
1357
    /// shared-center count; `weight="distinct"` (with `distinct_key`) by the number
1358
    /// of distinct values of that property over the shared centers (e.g. distinct
1359
    /// days). Returns `{other: weight}`. GIL released.
1360
    /// Wraps `GraphSnapshot::co_occurring`.
1361
    #[pyo3(signature = (seed, rel_type, direction, weight=None, distinct_key=None))]
1362
    fn co_occurring(
1363
        &self,
1364
        py: Python<'_>,
1365
        seed: u32,
1366
        rel_type: String,
1367
        direction: Direction,
1368
        weight: Option<String>,
1369
        distinct_key: Option<String>,
1370
    ) -> PyResult<std::collections::HashMap<u32, u64>> {
1371
        // Validate the weight mode up front (under the GIL), then run released.
1372
        let mode = match weight.as_deref() {
1373
            None | Some("count") => None,
1374
            Some("distinct") => Some(distinct_key.ok_or_else(|| {
1375
                PyErr::new::<pyo3::exceptions::PyValueError, _>(
1376
                    "weight='distinct' requires distinct_key",
1377
                )
1378
            })?),
1379
            Some(other) => {
1380
                return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
1381
                    "unknown weight '{}' (use 'count' or 'distinct')",
1382
                    other
1383
                )))
1384
            }
1385
        };
1386
        let snapshot = self.snapshot.clone();
1387
        let dir: rustychickpeas_core::Direction = direction.into();
1388
        let out: std::collections::HashMap<u32, u64> = py.allow_threads(move || {
1389
            let w = match &mode {
1390
                None => rustychickpeas_core::CoWeight::Count,
1391
                Some(k) => rustychickpeas_core::CoWeight::Distinct(k.as_str()),
1392
            };
1393
            // core returns a hashbrown map; collect into std so PyO3 hands back a dict.
1394
            snapshot
1395
                .co_occurring(seed, rel_type.as_str(), dir, w)
1396
                .into_iter()
1397
                .collect()
1398
        });
1399
        Ok(out)
1400
    }
1401

1402
    /// Get the version of this snapshot
1403
    fn version(&self) -> PyResult<Option<String>> {
1404
        Ok(self.snapshot.version().map(|s| s.to_string()))
1405
    }
1406

1407
    /// Create a GraphSnapshot from Parquet files using GraphBuilder
1408
    #[staticmethod]
1409
    #[allow(clippy::too_many_arguments)]
1410
    #[pyo3(signature = (nodes_path=None, relationships_path=None, node_id_column=None, label_columns=None, node_property_columns=None, start_node_column=None, end_node_column=None, rel_type_column=None, rel_property_columns=None))]
1411
    fn read_from_parquet(
1412
        nodes_path: Option<String>,
1413
        relationships_path: Option<String>,
1414
        node_id_column: Option<String>,
1415
        label_columns: Option<Vec<String>>,
1416
        node_property_columns: Option<Vec<String>>,
1417
        start_node_column: Option<String>,
1418
        end_node_column: Option<String>,
1419
        rel_type_column: Option<String>,
1420
        rel_property_columns: Option<Vec<String>>,
1421
    ) -> PyResult<GraphSnapshot> {
1422
        let label_cols = label_columns
1423
            .as_ref()
1424
            .map(|cols| cols.iter().map(|s| s.as_str()).collect());
1425
        let node_prop_cols = node_property_columns
1426
            .as_ref()
1427
            .map(|cols| cols.iter().map(|s| s.as_str()).collect());
1428
        let rel_prop_cols = rel_property_columns
1429
            .as_ref()
1430
            .map(|cols| cols.iter().map(|s| s.as_str()).collect());
1431

1432
        let snapshot = CoreGraphSnapshot::from_parquet(
1433
            nodes_path.as_deref(),
1434
            relationships_path.as_deref(),
1435
            node_id_column.as_deref(),
1436
            label_cols,
1437
            node_prop_cols,
1438
            start_node_column.as_deref(),
1439
            end_node_column.as_deref(),
1440
            rel_type_column.as_deref(),
1441
            rel_prop_cols,
1442
        )
1443
        .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1444

1445
        Ok(GraphSnapshot::new(snapshot))
1446
    }
1447

1448
    /// Write this snapshot to an RCPG file on disk. With `topology_only=True`,
1449
    /// omit the property columns for a lean, traversal-only file (per-node data
1450
    /// is expected to live in a record store instead).
1451
    #[pyo3(signature = (path, topology_only=false))]
1452
    fn write_rcpg(&self, path: String, topology_only: bool) -> PyResult<()> {
1453
        if topology_only {
1454
            use rustychickpeas_core::format::rcpg::WriteOptions;
1455
            use std::io::Write;
1456
            let mut file = std::io::BufWriter::new(
1457
                std::fs::File::create(&path)
1458
                    .map_err(|e| PyErr::new::<pyo3::exceptions::PyIOError, _>(e.to_string()))?,
1459
            );
1460
            self.snapshot
1461
                .write_rcpg_with(&mut file, &WriteOptions::topology_only())
1462
                .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1463
            file.flush()
1464
                .map_err(|e| PyErr::new::<pyo3::exceptions::PyIOError, _>(e.to_string()))?;
1465
        } else {
1466
            self.snapshot
1467
                .write_rcpg_file(&path)
1468
                .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1469
        }
1470
        Ok(())
1471
    }
1472

1473
    /// Read a snapshot from an RCPG file on disk (the property index rebuilds
1474
    /// lazily on first use).
1475
    #[staticmethod]
1476
    fn read_rcpg(path: String) -> PyResult<GraphSnapshot> {
1477
        let snapshot = CoreGraphSnapshot::read_rcpg_file(&path)
1478
            .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
1479
        Ok(GraphSnapshot::new(snapshot))
1480
    }
1481

1482
    /// Bidirectional BFS to find paths between source and target node sets
1483
    ///
1484
    /// Performs BFS from both source and target nodes simultaneously, meeting in the middle.
1485
    /// Returns the intersection of nodes and relationships that lie on paths between the sets.
1486
    ///
1487
    /// # Arguments
1488
    /// * `source_nodes` - List of starting node IDs for forward traversal
1489
    /// * `target_nodes` - List of starting node IDs for backward traversal
1490
    /// * `direction` - Direction of traversal (Direction.Outgoing, Direction.Incoming, or Direction.Both)
1491
    ///   - Outgoing: Forward search uses outgoing rels, backward uses incoming (default for finding paths from source to target)
1492
    ///   - Incoming: Forward search uses incoming rels, backward uses outgoing (reverse direction)
1493
    ///   - Both: Both searches use both directions (bidirectional traversal)
1494
    /// * `rel_types` - Optional list of relationship type names to filter by
1495
    /// * `node_filter` - Optional callable that takes (node_id: int) and returns bool.
1496
    ///   Returns True to include/continue from a node.
1497
    /// * `rel_filter` - Optional callable that takes (from_node: int, to_node: int, rel_type: str, csr_pos: int) and returns bool.
1498
    ///   Returns True to follow a relationship.
1499
    /// * `max_depth` - Optional maximum depth for each direction (default: no limit)
1500
    ///
1501
    /// # Returns
1502
    /// A tuple `(node_ids, rel_csr_positions)` where:
1503
    /// - `node_ids`: List of node IDs on paths between source and target
1504
    /// - `rel_csr_positions`: List of relationship CSR positions on paths between source and target
1505
    ///
1506
    /// # Examples
1507
    /// ```python
1508
    /// from rustychickpeas import Direction
1509
    ///
1510
    /// # Simple bidirectional search (default: Outgoing)
1511
    /// nodes, rels = snapshot.bidirectional_bfs([0, 1], [10, 11], Direction.Outgoing)
1512
    ///
1513
    /// # With relationship type filter
1514
    /// nodes, rels = snapshot.bidirectional_bfs(
1515
    ///     [0, 1], [10, 11],
1516
    ///     Direction.Outgoing,
1517
    ///     rel_types=["KNOWS", "WORKS_WITH"]
1518
    /// )
1519
    ///
1520
    /// # Bidirectional traversal (both directions)
1521
    /// nodes, rels = snapshot.bidirectional_bfs(
1522
    ///     [0, 1], [10, 11],
1523
    ///     Direction.Both
1524
    /// )
1525
    ///
1526
    /// # With node filter (only "Person" nodes)
1527
    /// def node_filter(node_id):
1528
    ///     return "Person" in snapshot.node_labels(node_id)
1529
    ///
1530
    /// nodes, rels = snapshot.bidirectional_bfs(
1531
    ///     [0, 1], [10, 11],
1532
    ///     Direction.Outgoing,
1533
    ///     node_filter=node_filter
1534
    /// )
1535
    /// ```
1536
    #[pyo3(signature = (source_nodes, target_nodes, direction, *, rel_types=None, node_filter=None, rel_filter=None, max_depth=None))]
1537
    #[allow(clippy::too_many_arguments)]
1538
    fn bidirectional_bfs(
1539
        &self,
1540
        py: Python<'_>,
1541
        source_nodes: Vec<u32>,
1542
        target_nodes: Vec<u32>,
1543
        direction: Direction,
1544
        rel_types: Option<Vec<String>>,
1545
        node_filter: Option<PyObject>,
1546
        rel_filter: Option<PyObject>,
1547
        max_depth: Option<u32>,
1548
    ) -> PyResult<(Vec<u32>, Vec<u32>)> {
1549
        let source_set = NodeSet::from(RoaringBitmap::from_iter(source_nodes.iter().copied()));
1550
        let target_set = NodeSet::from(RoaringBitmap::from_iter(target_nodes.iter().copied()));
1551

1552
        use rustychickpeas_core::types::Direction as CoreDirection;
1553
        let rust_direction = match direction {
1554
            Direction::Outgoing => CoreDirection::Outgoing,
1555
            Direction::Incoming => CoreDirection::Incoming,
1556
            Direction::Both => CoreDirection::Both,
1557
        };
1558

1559
        let rel_types_str: Option<Vec<&str>> = rel_types
1560
            .as_ref()
1561
            .map(|types| types.iter().map(|s| s.as_str()).collect());
1562

1563
        // Error cell to capture Python exceptions from filter callbacks
1564
        let error_cell: Arc<Mutex<Option<PyErr>>> = Arc::new(Mutex::new(None));
1565

1566
        // Release the GIL for the traversal; filter callbacks re-acquire it
1567
        // per invocation via Python::with_gil.
1568
        let (node_bitmap, rel_bitmap) = py.allow_threads(|| {
1569
            if let (Some(nf_obj), Some(rf_obj)) = (node_filter.as_ref(), rel_filter.as_ref()) {
1570
                let nf_err = error_cell.clone();
1571
                let rf_err = error_cell.clone();
1572
                self.snapshot.bidirectional_bfs(
1573
                    &source_set,
1574
                    &target_set,
1575
                    rust_direction,
1576
                    rel_types_str.as_deref(),
1577
                    Some(move |node_id: u32, _snapshot: &CoreGraphSnapshot| -> bool {
1578
                        if nf_err
1579
                            .lock()
1580
                            .unwrap_or_else(PoisonError::into_inner)
1581
                            .is_some()
1582
                        {
1583
                            return false;
1584
                        }
1585
                        Python::with_gil(|py| {
1586
                            match nf_obj
1587
                                .call1(py, (node_id,))
1588
                                .and_then(|r| r.extract::<bool>(py))
1589
                            {
1590
                                Ok(v) => v,
1591
                                Err(e) => {
1592
                                    *nf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1593
                                        Some(e);
1594
                                    false
1595
                                }
1596
                            }
1597
                        })
1598
                    }),
1599
                    Some(
1600
                        move |from: u32,
1601
                              to: u32,
1602
                              rel_type: RelationshipType,
1603
                              csr_pos: u32,
1604
                              snapshot: &CoreGraphSnapshot|
1605
                              -> bool {
1606
                            if rf_err
1607
                                .lock()
1608
                                .unwrap_or_else(PoisonError::into_inner)
1609
                                .is_some()
1610
                            {
1611
                                return false;
1612
                            }
1613
                            Python::with_gil(|py| {
1614
                                let rel_type_str = snapshot
1615
                                    .resolve_string(rel_type.id())
1616
                                    .unwrap_or("")
1617
                                    .to_string();
1618
                                match rf_obj
1619
                                    .call1(py, (from, to, rel_type_str, csr_pos))
1620
                                    .and_then(|r| r.extract::<bool>(py))
1621
                                {
1622
                                    Ok(v) => v,
1623
                                    Err(e) => {
1624
                                        *rf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1625
                                            Some(e);
1626
                                        false
1627
                                    }
1628
                                }
1629
                            })
1630
                        },
1631
                    ),
1632
                    max_depth,
1633
                )
1634
            } else if let Some(nf_obj) = node_filter.as_ref() {
1635
                let nf_err = error_cell.clone();
1636
                type RelFilter = fn(u32, u32, RelationshipType, u32, &CoreGraphSnapshot) -> bool;
1637
                self.snapshot.bidirectional_bfs::<_, RelFilter>(
1638
                    &source_set,
1639
                    &target_set,
1640
                    rust_direction,
1641
                    rel_types_str.as_deref(),
1642
                    Some(move |node_id: u32, _snapshot: &CoreGraphSnapshot| -> bool {
1643
                        if nf_err
1644
                            .lock()
1645
                            .unwrap_or_else(PoisonError::into_inner)
1646
                            .is_some()
1647
                        {
1648
                            return false;
1649
                        }
1650
                        Python::with_gil(|py| {
1651
                            match nf_obj
1652
                                .call1(py, (node_id,))
1653
                                .and_then(|r| r.extract::<bool>(py))
1654
                            {
1655
                                Ok(v) => v,
1656
                                Err(e) => {
1657
                                    *nf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1658
                                        Some(e);
1659
                                    false
1660
                                }
1661
                            }
1662
                        })
1663
                    }),
1664
                    None,
1665
                    max_depth,
1666
                )
1667
            } else if let Some(rf_obj) = rel_filter.as_ref() {
1668
                let rf_err = error_cell.clone();
1669
                type NodeFilter = fn(u32, &CoreGraphSnapshot) -> bool;
1670
                self.snapshot.bidirectional_bfs::<NodeFilter, _>(
1671
                    &source_set,
1672
                    &target_set,
1673
                    rust_direction,
1674
                    rel_types_str.as_deref(),
1675
                    None,
1676
                    Some(
1677
                        move |from: u32,
1678
                              to: u32,
1679
                              rel_type: RelationshipType,
1680
                              csr_pos: u32,
1681
                              snapshot: &CoreGraphSnapshot|
1682
                              -> bool {
1683
                            if rf_err
1684
                                .lock()
1685
                                .unwrap_or_else(PoisonError::into_inner)
1686
                                .is_some()
1687
                            {
1688
                                return false;
1689
                            }
1690
                            Python::with_gil(|py| {
1691
                                let rel_type_str = snapshot
1692
                                    .resolve_string(rel_type.id())
1693
                                    .unwrap_or("")
1694
                                    .to_string();
1695
                                match rf_obj
1696
                                    .call1(py, (from, to, rel_type_str, csr_pos))
1697
                                    .and_then(|r| r.extract::<bool>(py))
1698
                                {
1699
                                    Ok(v) => v,
1700
                                    Err(e) => {
1701
                                        *rf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1702
                                            Some(e);
1703
                                        false
1704
                                    }
1705
                                }
1706
                            })
1707
                        },
1708
                    ),
1709
                    max_depth,
1710
                )
1711
            } else {
1712
                type NodeFilter = fn(u32, &CoreGraphSnapshot) -> bool;
1713
                type RelFilter = fn(u32, u32, RelationshipType, u32, &CoreGraphSnapshot) -> bool;
1714
                self.snapshot.bidirectional_bfs::<NodeFilter, RelFilter>(
1715
                    &source_set,
1716
                    &target_set,
1717
                    rust_direction,
1718
                    rel_types_str.as_deref(),
1719
                    None,
1720
                    None,
1721
                    max_depth,
1722
                )
1723
            }
1724
        });
1725

1726
        // Propagate any Python exception captured during BFS
1727
        if let Some(err) = error_cell
1728
            .lock()
1729
            .unwrap_or_else(PoisonError::into_inner)
1730
            .take()
1731
        {
1732
            return Err(err);
1733
        }
1734

1735
        Ok((node_bitmap.iter().collect(), rel_bitmap.iter().collect()))
1736
    }
1737

1738
    /// BFS traversal from a set of starting nodes
1739
    ///
1740
    /// Performs BFS from the starting nodes, following rels in the specified direction.
1741
    /// Returns all nodes and relationships visited during the traversal.
1742
    ///
1743
    /// # Arguments
1744
    /// * `start_nodes` - List of starting node IDs
1745
    /// * `direction` - Direction of traversal (Direction.Outgoing, Direction.Incoming, or Direction.Both)
1746
    ///   - Outgoing: Follow outgoing rels
1747
    ///   - Incoming: Follow incoming rels
1748
    ///   - Both: Follow both outgoing and incoming rels
1749
    /// * `rel_types` - Optional list of relationship type names to filter by
1750
    /// * `node_filter` - Optional callable that takes (node_id: int) and returns bool.
1751
    ///   Returns True to include/continue from a node.
1752
    /// * `rel_filter` - Optional callable that takes (from_node: int, to_node: int, rel_type: str, csr_pos: int) and returns bool.
1753
    ///   Returns True to follow a relationship.
1754
    /// * `max_depth` - Optional maximum depth (default: no limit)
1755
    ///
1756
    /// # Returns
1757
    /// A tuple `(node_ids, rel_csr_positions)` where:
1758
    /// - `node_ids`: List of node IDs visited during traversal
1759
    /// - `rel_csr_positions`: List of relationship CSR positions traversed
1760
    ///
1761
    /// # Examples
1762
    /// ```python
1763
    /// from rustychickpeas import Direction
1764
    ///
1765
    /// # Simple BFS from a single node
1766
    /// nodes, rels = snapshot.bfs([0], Direction.Outgoing)
1767
    ///
1768
    /// # BFS with relationship type filter
1769
    /// nodes, rels = snapshot.bfs(
1770
    ///     [0], Direction.Outgoing,
1771
    ///     rel_types=["KNOWS", "WORKS_WITH"]
1772
    /// )
1773
    ///
1774
    /// # BFS with max depth
1775
    /// nodes, rels = snapshot.bfs(
1776
    ///     [0], Direction.Outgoing,
1777
    ///     max_depth=3
1778
    /// )
1779
    ///
1780
    /// # BFS with node filter (only "Person" nodes)
1781
    /// def node_filter(node_id):
1782
    ///     return "Person" in snapshot.node_labels(node_id)
1783
    ///
1784
    /// nodes, rels = snapshot.bfs(
1785
    ///     [0], Direction.Outgoing,
1786
    ///     node_filter=node_filter
1787
    /// )
1788
    /// ```
1789
    #[pyo3(signature = (start_nodes, direction, *, rel_types=None, node_filter=None, rel_filter=None, max_depth=None))]
1790
    #[allow(clippy::too_many_arguments)]
1791
    fn bfs(
1792
        &self,
1793
        py: Python<'_>,
1794
        start_nodes: Vec<u32>,
1795
        direction: Direction,
1796
        rel_types: Option<Vec<String>>,
1797
        node_filter: Option<PyObject>,
1798
        rel_filter: Option<PyObject>,
1799
        max_depth: Option<u32>,
1800
    ) -> PyResult<(Vec<u32>, Vec<u32>)> {
1801
        let start_set = NodeSet::from(RoaringBitmap::from_iter(start_nodes.iter().copied()));
1802

1803
        use rustychickpeas_core::types::Direction as CoreDirection;
1804
        let rust_direction = match direction {
1805
            Direction::Outgoing => CoreDirection::Outgoing,
1806
            Direction::Incoming => CoreDirection::Incoming,
1807
            Direction::Both => CoreDirection::Both,
1808
        };
1809

1810
        let rel_types_str: Option<Vec<&str>> = rel_types
1811
            .as_ref()
1812
            .map(|types| types.iter().map(|s| s.as_str()).collect());
1813

1814
        // Error cell to capture Python exceptions from filter callbacks
1815
        let error_cell: Arc<Mutex<Option<PyErr>>> = Arc::new(Mutex::new(None));
1816

1817
        // Release the GIL for the traversal; filter callbacks re-acquire it
1818
        // per invocation via Python::with_gil.
1819
        let (node_bitmap, rel_bitmap) = py.allow_threads(|| {
1820
            if let (Some(nf_obj), Some(rf_obj)) = (node_filter.as_ref(), rel_filter.as_ref()) {
1821
                let nf_err = error_cell.clone();
1822
                let rf_err = error_cell.clone();
1823
                self.snapshot.bfs(
1824
                    &start_set,
1825
                    rust_direction,
1826
                    rel_types_str.as_deref(),
1827
                    Some(move |node_id: u32, _snapshot: &CoreGraphSnapshot| -> bool {
1828
                        if nf_err
1829
                            .lock()
1830
                            .unwrap_or_else(PoisonError::into_inner)
1831
                            .is_some()
1832
                        {
1833
                            return false;
1834
                        }
1835
                        Python::with_gil(|py| {
1836
                            match nf_obj
1837
                                .call1(py, (node_id,))
1838
                                .and_then(|r| r.extract::<bool>(py))
1839
                            {
1840
                                Ok(v) => v,
1841
                                Err(e) => {
1842
                                    *nf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1843
                                        Some(e);
1844
                                    false
1845
                                }
1846
                            }
1847
                        })
1848
                    }),
1849
                    Some(
1850
                        move |from: u32,
1851
                              to: u32,
1852
                              rel_type: RelationshipType,
1853
                              csr_pos: u32,
1854
                              snapshot: &CoreGraphSnapshot|
1855
                              -> bool {
1856
                            if rf_err
1857
                                .lock()
1858
                                .unwrap_or_else(PoisonError::into_inner)
1859
                                .is_some()
1860
                            {
1861
                                return false;
1862
                            }
1863
                            Python::with_gil(|py| {
1864
                                let rel_type_str = snapshot
1865
                                    .resolve_string(rel_type.id())
1866
                                    .unwrap_or("")
1867
                                    .to_string();
1868
                                match rf_obj
1869
                                    .call1(py, (from, to, rel_type_str, csr_pos))
1870
                                    .and_then(|r| r.extract::<bool>(py))
1871
                                {
1872
                                    Ok(v) => v,
1873
                                    Err(e) => {
1874
                                        *rf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1875
                                            Some(e);
1876
                                        false
1877
                                    }
1878
                                }
1879
                            })
1880
                        },
1881
                    ),
1882
                    max_depth,
1883
                )
1884
            } else if let Some(nf_obj) = node_filter.as_ref() {
1885
                let nf_err = error_cell.clone();
1886
                type RelFilter = fn(u32, u32, RelationshipType, u32, &CoreGraphSnapshot) -> bool;
1887
                self.snapshot.bfs::<_, RelFilter>(
1888
                    &start_set,
1889
                    rust_direction,
1890
                    rel_types_str.as_deref(),
1891
                    Some(move |node_id: u32, _snapshot: &CoreGraphSnapshot| -> bool {
1892
                        if nf_err
1893
                            .lock()
1894
                            .unwrap_or_else(PoisonError::into_inner)
1895
                            .is_some()
1896
                        {
1897
                            return false;
1898
                        }
1899
                        Python::with_gil(|py| {
1900
                            match nf_obj
1901
                                .call1(py, (node_id,))
1902
                                .and_then(|r| r.extract::<bool>(py))
1903
                            {
1904
                                Ok(v) => v,
1905
                                Err(e) => {
1906
                                    *nf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1907
                                        Some(e);
1908
                                    false
1909
                                }
1910
                            }
1911
                        })
1912
                    }),
1913
                    None,
1914
                    max_depth,
1915
                )
1916
            } else if let Some(rf_obj) = rel_filter.as_ref() {
1917
                let rf_err = error_cell.clone();
1918
                type NodeFilter = fn(u32, &CoreGraphSnapshot) -> bool;
1919
                self.snapshot.bfs::<NodeFilter, _>(
1920
                    &start_set,
1921
                    rust_direction,
1922
                    rel_types_str.as_deref(),
1923
                    None,
1924
                    Some(
1925
                        move |from: u32,
1926
                              to: u32,
1927
                              rel_type: RelationshipType,
1928
                              csr_pos: u32,
1929
                              snapshot: &CoreGraphSnapshot|
1930
                              -> bool {
1931
                            if rf_err
1932
                                .lock()
1933
                                .unwrap_or_else(PoisonError::into_inner)
1934
                                .is_some()
1935
                            {
1936
                                return false;
1937
                            }
1938
                            Python::with_gil(|py| {
1939
                                let rel_type_str = snapshot
1940
                                    .resolve_string(rel_type.id())
1941
                                    .unwrap_or("")
1942
                                    .to_string();
1943
                                match rf_obj
1944
                                    .call1(py, (from, to, rel_type_str, csr_pos))
1945
                                    .and_then(|r| r.extract::<bool>(py))
1946
                                {
1947
                                    Ok(v) => v,
1948
                                    Err(e) => {
1949
                                        *rf_err.lock().unwrap_or_else(PoisonError::into_inner) =
1950
                                            Some(e);
1951
                                        false
1952
                                    }
1953
                                }
1954
                            })
1955
                        },
1956
                    ),
1957
                    max_depth,
1958
                )
1959
            } else {
1960
                type NodeFilter = fn(u32, &CoreGraphSnapshot) -> bool;
1961
                type RelFilter = fn(u32, u32, RelationshipType, u32, &CoreGraphSnapshot) -> bool;
1962
                self.snapshot.bfs::<NodeFilter, RelFilter>(
1963
                    &start_set,
1964
                    rust_direction,
1965
                    rel_types_str.as_deref(),
1966
                    None,
1967
                    None,
1968
                    max_depth,
1969
                )
1970
            }
1971
        });
1972

1973
        // Propagate any Python exception captured during BFS
1974
        if let Some(err) = error_cell
1975
            .lock()
1976
            .unwrap_or_else(PoisonError::into_inner)
1977
            .take()
1978
        {
1979
            return Err(err);
1980
        }
1981

1982
        Ok((node_bitmap.iter().collect(), rel_bitmap.iter().collect()))
1983
    }
1984

1985
    /// Shortest hop-distance from `start` to every node reachable along `rel_types`
1986
    /// in `direction`, bounded to `max_depth` hops. Returns `{node_id: distance}`
1987
    /// (start is distance 0); `rel_types=None` follows every type. The typed
1988
    /// bounded BFS behind hop-distance filters (e.g. "friends 3..4 hops away").
1989
    #[pyo3(signature = (start, direction, *, rel_types=None, max_depth=None))]
1990
    fn bfs_distances(
1991
        &self,
1992
        py: Python<'_>,
1993
        start: u32,
1994
        direction: Direction,
1995
        rel_types: Option<Vec<String>>,
1996
        max_depth: Option<u32>,
1997
    ) -> std::collections::HashMap<u32, u32> {
1998
        let snapshot = self.snapshot.clone();
1999
        let dir: rustychickpeas_core::types::Direction = direction.into();
2000
        py.allow_threads(move || {
2001
            let types: Vec<&str> = rel_types
2002
                .as_ref()
2003
                .map(|t| t.iter().map(|s| s.as_str()).collect())
2004
                .unwrap_or_default();
2005
            snapshot
2006
                .bfs_distances(start, dir, types.as_slice(), max_depth)
2007
                .into_iter()
2008
                .collect()
2009
        })
2010
    }
2011

2012
    /// Check if a path exists between two nodes
2013
    #[pyo3(signature = (from_node, to_node, direction, *, rel_types=None, max_depth=None))]
2014
    fn can_reach(
2015
        &self,
2016
        from_node: u32,
2017
        to_node: u32,
2018
        direction: Direction,
2019
        rel_types: Option<Vec<String>>,
2020
        max_depth: Option<usize>,
2021
    ) -> PyResult<bool> {
2022
        if from_node >= self.snapshot.n_nodes {
2023
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
2024
                "from_node {} out of range (max: {})",
2025
                from_node,
2026
                self.snapshot.n_nodes.saturating_sub(1)
2027
            )));
2028
        }
2029
        if to_node >= self.snapshot.n_nodes {
2030
            return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
2031
                "to_node {} out of range (max: {})",
2032
                to_node,
2033
                self.snapshot.n_nodes.saturating_sub(1)
2034
            )));
2035
        }
2036

2037
        use rustychickpeas_core::types::Direction as CoreDirection;
2038
        let rust_direction = match direction {
2039
            Direction::Outgoing => CoreDirection::Outgoing,
2040
            Direction::Incoming => CoreDirection::Incoming,
2041
            Direction::Both => CoreDirection::Both,
2042
        };
2043

2044
        let rel_types_str: Option<Vec<&str>> = rel_types
2045
            .as_ref()
2046
            .map(|types| types.iter().map(|s| s.as_str()).collect());
2047

2048
        // can_reach takes Option<u32> for max_depth
2049
        let max_depth_u32 = max_depth.map(|d| d as u32);
2050

2051
        Ok(self.snapshot.can_reach(
2052
            from_node,
2053
            to_node,
2054
            rust_direction,
2055
            rel_types_str.as_deref(),
2056
            max_depth_u32,
2057
        ))
2058
    }
2059
}
2060

2061
/// A dense property column exposed to Python as a self-describing, buffer-protocol
2062
/// array (its dtype is intrinsic — no `.i64()` narrowing). Built by
2063
/// [`GraphSnapshot::column`]; holds an `Arc` to the snapshot so the zero-copy
2064
/// buffer it hands out (numpy / pyarrow / memoryview) stays valid for its lifetime.
2065
#[pyclass(name = "Column")]
2066
pub struct Column {
2067
    snapshot: Arc<CoreGraphSnapshot>,
2068
    key: String,
2069
    dtype: ColumnDtype,
2070
    len: usize,
2071
    itemsize: isize,
2072
    // One-element shape/strides the buffer view points at (must outlive the view;
2073
    // they live in this object, which view.obj keeps alive).
2074
    shape: [ffi::Py_ssize_t; 1],
2075
    strides: [ffi::Py_ssize_t; 1],
2076
    // Booleans are bit-packed in core; expanded to one 0/1 byte per node so the
2077
    // buffer has a standard layout. `None` (zero-copy) for the other dtypes.
2078
    bool_bytes: Option<Vec<u8>>,
2079
}
2080

2081
impl Column {
2082
    /// Build a Column for a dense node column, or `None` if absent / not dense.
2083
    fn build(snapshot: Arc<CoreGraphSnapshot>, key: &str) -> Option<Column> {
2084
        let col = snapshot.col(key)?;
2085
        let dtype = col.dtype();
2086
        let (len, itemsize, bool_bytes) = match dtype {
2087
            ColumnDtype::I64 => (col.i64().as_slice()?.len(), 8isize, None),
2088
            ColumnDtype::F64 => (col.f64().as_slice()?.len(), 8, None),
2089
            ColumnDtype::Str => (col.str().as_ids()?.len(), 4, None),
2090
            ColumnDtype::Bool => {
2091
                let bytes: Vec<u8> = col.bool().as_slice()?.iter().map(|b| *b as u8).collect();
2092
                (bytes.len(), 1, Some(bytes))
2093
            }
2094
        };
2095
        Some(Column {
2096
            snapshot,
2097
            key: key.to_string(),
2098
            dtype,
2099
            len,
2100
            itemsize,
2101
            shape: [len as ffi::Py_ssize_t],
2102
            strides: [itemsize as ffi::Py_ssize_t],
2103
            bool_bytes,
2104
        })
2105
    }
2106

2107
    /// Raw data pointer + struct-format char for the buffer view. The pointer is
2108
    /// into the immutable snapshot (or `bool_bytes`) and is stable for the lifetime.
2109
    fn buffer_ptr_format(&self) -> (*const u8, &'static [u8]) {
2110
        match self.dtype {
2111
            ColumnDtype::I64 => {
2112
                let s = self
2113
                    .snapshot
2114
                    .col(&self.key)
2115
                    .unwrap()
2116
                    .i64()
2117
                    .as_slice()
2118
                    .unwrap();
2119
                (s.as_ptr() as *const u8, b"q\0")
2120
            }
2121
            ColumnDtype::F64 => {
2122
                let s = self
2123
                    .snapshot
2124
                    .col(&self.key)
2125
                    .unwrap()
2126
                    .f64()
2127
                    .as_slice()
2128
                    .unwrap();
2129
                (s.as_ptr() as *const u8, b"d\0")
2130
            }
2131
            ColumnDtype::Str => {
2132
                let s = self
2133
                    .snapshot
2134
                    .col(&self.key)
2135
                    .unwrap()
2136
                    .str()
2137
                    .as_ids()
2138
                    .unwrap();
2139
                (s.as_ptr() as *const u8, b"I\0")
2140
            }
2141
            ColumnDtype::Bool => (self.bool_bytes.as_ref().unwrap().as_ptr(), b"B\0"),
2142
        }
2143
    }
2144
}
2145

2146
#[pymethods]
2147
impl Column {
2148
    /// The numpy/struct dtype name: 'int64' | 'float64' | 'bool' | 'string'.
2149
    #[getter]
2150
    fn dtype(&self) -> &'static str {
2151
        match self.dtype {
2152
            ColumnDtype::I64 => "int64",
2153
            ColumnDtype::F64 => "float64",
2154
            ColumnDtype::Bool => "bool",
2155
            ColumnDtype::Str => "string",
2156
        }
2157
    }
2158

2159
    fn __len__(&self) -> usize {
2160
        self.len
2161
    }
2162

2163
    fn __repr__(&self) -> String {
2164
        format!(
2165
            "Column(key='{}', dtype='{}', len={})",
2166
            self.key,
2167
            self.dtype(),
2168
            self.len
2169
        )
2170
    }
2171

2172
    /// The column as a plain Python list. String columns resolve interned ids to
2173
    /// `str`; numeric/bool columns return `int` / `float` / `bool`.
2174
    fn to_pylist(&self, py: Python<'_>) -> PyResult<PyObject> {
2175
        match self.dtype {
2176
            ColumnDtype::I64 => self
2177
                .snapshot
2178
                .col(&self.key)
2179
                .unwrap()
2180
                .i64()
2181
                .as_slice()
2182
                .unwrap()
2183
                .to_vec()
2184
                .into_py_any(py),
2185
            ColumnDtype::F64 => self
2186
                .snapshot
2187
                .col(&self.key)
2188
                .unwrap()
2189
                .f64()
2190
                .as_slice()
2191
                .unwrap()
2192
                .to_vec()
2193
                .into_py_any(py),
2194
            ColumnDtype::Bool => {
2195
                let v: Vec<bool> = self
2196
                    .bool_bytes
2197
                    .as_ref()
2198
                    .unwrap()
2199
                    .iter()
2200
                    .map(|&b| b != 0)
2201
                    .collect();
2202
                v.into_py_any(py)
2203
            }
2204
            ColumnDtype::Str => {
2205
                let ids = self
2206
                    .snapshot
2207
                    .col(&self.key)
2208
                    .unwrap()
2209
                    .str()
2210
                    .as_ids()
2211
                    .unwrap();
2212
                let v: Vec<&str> = ids
2213
                    .iter()
2214
                    .map(|&id| self.snapshot.resolve_string(id).unwrap_or(""))
2215
                    .collect();
2216
                v.into_py_any(py)
2217
            }
2218
        }
2219
    }
2220

2221
    /// Buffer protocol: expose the dense bytes zero-copy (read-only, 1-D,
2222
    /// C-contiguous). `view.obj` takes a new reference to this Column so the backing
2223
    /// memory stays alive while the view is held.
2224
    unsafe fn __getbuffer__(
2225
        slf: PyRef<'_, Self>,
2226
        view: *mut ffi::Py_buffer,
2227
        flags: c_int,
2228
    ) -> PyResult<()> {
2229
        if view.is_null() {
2230
            return Err(pyo3::exceptions::PyBufferError::new_err("view is null"));
2231
        }
2232
        if (flags & ffi::PyBUF_WRITABLE) == ffi::PyBUF_WRITABLE {
2233
            return Err(pyo3::exceptions::PyBufferError::new_err(
2234
                "column buffer is read-only",
2235
            ));
2236
        }
2237
        let (ptr, format) = slf.buffer_ptr_format();
2238
        let obj = slf.as_ptr();
2239
        ffi::Py_INCREF(obj);
2240
        (*view).obj = obj;
2241
        (*view).buf = ptr as *mut c_void;
2242
        (*view).len = (slf.len as isize) * slf.itemsize;
2243
        (*view).readonly = 1;
2244
        (*view).itemsize = slf.itemsize;
2245
        (*view).ndim = 1;
2246
        (*view).format = if (flags & ffi::PyBUF_FORMAT) == ffi::PyBUF_FORMAT {
2247
            format.as_ptr() as *mut c_char
2248
        } else {
2249
            std::ptr::null_mut()
2250
        };
2251
        (*view).shape = if (flags & ffi::PyBUF_ND) == ffi::PyBUF_ND {
2252
            slf.shape.as_ptr() as *mut ffi::Py_ssize_t
2253
        } else {
2254
            std::ptr::null_mut()
2255
        };
2256
        (*view).strides = if (flags & ffi::PyBUF_STRIDES) == ffi::PyBUF_STRIDES {
2257
            slf.strides.as_ptr() as *mut ffi::Py_ssize_t
2258
        } else {
2259
            std::ptr::null_mut()
2260
        };
2261
        (*view).suboffsets = std::ptr::null_mut();
2262
        (*view).internal = std::ptr::null_mut();
2263
        Ok(())
2264
    }
2265

2266
    unsafe fn __releasebuffer__(&self, _view: *mut ffi::Py_buffer) {
2267
        // Nothing to free: format is static, shape/strides live in self, and
2268
        // CPython decrefs view.obj.
2269
    }
2270
}
2271

2272
/// A `node -> node id` array, indexed by node id: `arr[node]` or `memoryview(arr)`
2273
/// for a hot loop (zero-copy buffer, format 'I'/u32; `u32::MAX` = none). Returned by
2274
/// [`GraphSnapshot::roots_via`] (the chain terminal of a functional relation) and
2275
/// [`GraphSnapshot::neighbor_via`] (its one-hop neighbor).
2276
#[pyclass]
2277
pub struct NodeArray {
2278
    inner: Arc<[u32]>,
2279
    shape: [ffi::Py_ssize_t; 1],
2280
    strides: [ffi::Py_ssize_t; 1],
2281
}
2282

2283
#[pymethods]
2284
impl NodeArray {
2285
    fn __len__(&self) -> usize {
2286
        self.inner.len()
2287
    }
2288

2289
    fn __getitem__(&self, index: isize) -> PyResult<u32> {
2290
        let n = self.inner.len() as isize;
2291
        let i = if index < 0 { index + n } else { index };
2292
        if i < 0 || i >= n {
2293
            return Err(pyo3::exceptions::PyIndexError::new_err(
2294
                "node id out of range",
2295
            ));
2296
        }
2297
        Ok(self.inner[i as usize])
2298
    }
2299

2300
    fn __repr__(&self) -> String {
2301
        format!("NodeArray(len={})", self.inner.len())
2302
    }
2303

2304
    /// The whole array as a Python list of node ids.
2305
    fn to_pylist(&self, py: Python<'_>) -> PyResult<PyObject> {
2306
        self.inner.to_vec().into_py_any(py)
2307
    }
2308

2309
    /// Buffer protocol: expose the u32 array zero-copy (read-only, 1-D, format 'I').
2310
    unsafe fn __getbuffer__(
2311
        slf: PyRef<'_, Self>,
2312
        view: *mut ffi::Py_buffer,
2313
        flags: c_int,
2314
    ) -> PyResult<()> {
2315
        if view.is_null() {
2316
            return Err(pyo3::exceptions::PyBufferError::new_err("view is null"));
2317
        }
2318
        if (flags & ffi::PyBUF_WRITABLE) == ffi::PyBUF_WRITABLE {
2319
            return Err(pyo3::exceptions::PyBufferError::new_err(
2320
                "roots buffer is read-only",
2321
            ));
2322
        }
2323
        let obj = slf.as_ptr();
2324
        ffi::Py_INCREF(obj);
2325
        (*view).obj = obj;
2326
        (*view).buf = slf.inner.as_ptr() as *mut c_void;
2327
        (*view).len = (slf.inner.len() as isize) * 4;
2328
        (*view).readonly = 1;
2329
        (*view).itemsize = 4;
2330
        (*view).ndim = 1;
2331
        (*view).format = if (flags & ffi::PyBUF_FORMAT) == ffi::PyBUF_FORMAT {
2332
            c"I".as_ptr() as *mut c_char
2333
        } else {
2334
            std::ptr::null_mut()
2335
        };
2336
        (*view).shape = if (flags & ffi::PyBUF_ND) == ffi::PyBUF_ND {
2337
            slf.shape.as_ptr() as *mut ffi::Py_ssize_t
2338
        } else {
2339
            std::ptr::null_mut()
2340
        };
2341
        (*view).strides = if (flags & ffi::PyBUF_STRIDES) == ffi::PyBUF_STRIDES {
2342
            slf.strides.as_ptr() as *mut ffi::Py_ssize_t
2343
        } else {
2344
            std::ptr::null_mut()
2345
        };
2346
        (*view).suboffsets = std::ptr::null_mut();
2347
        (*view).internal = std::ptr::null_mut();
2348
        Ok(())
2349
    }
2350

2351
    unsafe fn __releasebuffer__(&self, _view: *mut ffi::Py_buffer) {}
2352
}
2353

2354
/// An immutable `(node, node) -> count` map keyed by the *unordered* pair — the
2355
/// resident result of [`GraphSnapshot::fold_via`] (a one-mode projection). Kept native
2356
/// so it can drive a weighted [`GraphSnapshot::dijkstra`] without a per-rel Python
2357
/// callback; dict-like for inspection (`pw[(a, b)]`, `(a, b) in pw`, `len(pw)`,
2358
/// `pw.to_dict()`). Lookups normalize the key to `(min, max)`.
2359
#[pyclass]
2360
pub struct PairWeights {
2361
    inner: Arc<std::collections::HashMap<(u32, u32), u64>>,
2362
}
2363

2364
#[pymethods]
2365
impl PairWeights {
2366
    fn __len__(&self) -> usize {
2367
        self.inner.len()
2368
    }
2369

2370
    fn __contains__(&self, key: (u32, u32)) -> bool {
2371
        let (a, b) = key;
2372
        let k = if a < b { (a, b) } else { (b, a) };
2373
        self.inner.contains_key(&k)
2374
    }
2375

2376
    fn __getitem__(&self, key: (u32, u32)) -> PyResult<u64> {
2377
        let (a, b) = key;
2378
        let k = if a < b { (a, b) } else { (b, a) };
2379
        self.inner
2380
            .get(&k)
2381
            .copied()
2382
            .ok_or_else(|| pyo3::exceptions::PyKeyError::new_err(format!("{:?}", key)))
2383
    }
2384

2385
    /// The count for the unordered pair `(a, b)`, or `default` (`None`) when absent.
2386
    #[pyo3(signature = (a, b, default=None))]
2387
    fn get(&self, a: u32, b: u32, default: Option<u64>) -> Option<u64> {
2388
        let k = if a < b { (a, b) } else { (b, a) };
2389
        self.inner.get(&k).copied().or(default)
2390
    }
2391

2392
    fn __repr__(&self) -> String {
2393
        format!("PairWeights(pairs={})", self.inner.len())
2394
    }
2395

2396
    /// Materialize as a Python dict `{(a, b): count}` (keys are `(min, max)`).
2397
    fn to_dict(&self, py: Python<'_>) -> PyResult<PyObject> {
2398
        let d = pyo3::types::PyDict::new(py);
2399
        for (&(a, b), &c) in self.inner.iter() {
2400
            d.set_item((a, b), c)?;
2401
        }
2402
        d.into_py_any(py)
2403
    }
2404
}
2405

2406
/// Backing for a [`RelView`] column: a typed, `Arc`-shared array.
2407
enum RelArrayData {
2408
    U32(Arc<[u32]>),
2409
    I64(Arc<[i64]>),
2410
    F64(Arc<[f64]>),
2411
}
2412

2413
impl RelArrayData {
2414
    fn len(&self) -> usize {
2415
        match self {
2416
            RelArrayData::U32(a) => a.len(),
2417
            RelArrayData::I64(a) => a.len(),
2418
            RelArrayData::F64(a) => a.len(),
2419
        }
2420
    }
2421
    fn clone_data(&self) -> RelArrayData {
2422
        match self {
2423
            RelArrayData::U32(a) => RelArrayData::U32(a.clone()),
2424
            RelArrayData::I64(a) => RelArrayData::I64(a.clone()),
2425
            RelArrayData::F64(a) => RelArrayData::F64(a.clone()),
2426
        }
2427
    }
2428
    fn ptr(&self) -> *mut c_void {
2429
        match self {
2430
            RelArrayData::U32(a) => a.as_ptr() as *mut c_void,
2431
            RelArrayData::I64(a) => a.as_ptr() as *mut c_void,
2432
            RelArrayData::F64(a) => a.as_ptr() as *mut c_void,
2433
        }
2434
    }
2435
    fn itemsize(&self) -> ffi::Py_ssize_t {
2436
        match self {
2437
            RelArrayData::U32(_) => 4,
2438
            _ => 8,
2439
        }
2440
    }
2441
    fn format(&self) -> *mut c_char {
2442
        let s: &[u8] = match self {
2443
            RelArrayData::U32(_) => b"I\0",
2444
            RelArrayData::I64(_) => b"q\0",
2445
            RelArrayData::F64(_) => b"d\0",
2446
        };
2447
        s.as_ptr() as *mut c_char
2448
    }
2449
}
2450

2451
/// One column of a [`RelView`] as a read-only, 1-D, buffer-protocol array — zero-copy
2452
/// `memoryview(...)` (format `'I'`=u32, `'q'`=i64, `'d'`=f64).
2453
#[pyclass]
2454
pub struct RelArray {
2455
    data: RelArrayData,
2456
    shape: [ffi::Py_ssize_t; 1],
2457
    strides: [ffi::Py_ssize_t; 1],
2458
}
2459

2460
impl RelArray {
2461
    fn new(data: RelArrayData) -> Self {
2462
        let shape = [data.len() as ffi::Py_ssize_t];
2463
        let strides = [data.itemsize()];
2464
        RelArray {
2465
            data,
2466
            shape,
2467
            strides,
2468
        }
2469
    }
2470
}
2471

2472
#[pymethods]
2473
impl RelArray {
2474
    fn __len__(&self) -> usize {
2475
        self.data.len()
2476
    }
2477

2478
    unsafe fn __getbuffer__(
2479
        slf: PyRef<'_, Self>,
2480
        view: *mut ffi::Py_buffer,
2481
        flags: c_int,
2482
    ) -> PyResult<()> {
2483
        if view.is_null() {
2484
            return Err(pyo3::exceptions::PyBufferError::new_err("view is null"));
2485
        }
2486
        if (flags & ffi::PyBUF_WRITABLE) == ffi::PyBUF_WRITABLE {
2487
            return Err(pyo3::exceptions::PyBufferError::new_err(
2488
                "RelArray is read-only",
2489
            ));
2490
        }
2491
        let obj = slf.as_ptr();
2492
        ffi::Py_INCREF(obj);
2493
        (*view).obj = obj;
2494
        (*view).buf = slf.data.ptr();
2495
        (*view).len = (slf.data.len() as ffi::Py_ssize_t) * slf.data.itemsize();
2496
        (*view).readonly = 1;
2497
        (*view).itemsize = slf.data.itemsize();
2498
        (*view).ndim = 1;
2499
        (*view).format = if (flags & ffi::PyBUF_FORMAT) == ffi::PyBUF_FORMAT {
2500
            slf.data.format()
2501
        } else {
2502
            std::ptr::null_mut()
2503
        };
2504
        (*view).shape = if (flags & ffi::PyBUF_ND) == ffi::PyBUF_ND {
2505
            slf.shape.as_ptr() as *mut ffi::Py_ssize_t
2506
        } else {
2507
            std::ptr::null_mut()
2508
        };
2509
        (*view).strides = if (flags & ffi::PyBUF_STRIDES) == ffi::PyBUF_STRIDES {
2510
            slf.strides.as_ptr() as *mut ffi::Py_ssize_t
2511
        } else {
2512
            std::ptr::null_mut()
2513
        };
2514
        (*view).suboffsets = std::ptr::null_mut();
2515
        (*view).internal = std::ptr::null_mut();
2516
        Ok(())
2517
    }
2518

2519
    unsafe fn __releasebuffer__(&self, _view: *mut ffi::Py_buffer) {}
2520
}
2521

2522
/// A bulk view of one node's rels (from [`GraphSnapshot::rel_view`]): `.neighbors`
2523
/// (u32) and `.col(key)` (i64/f64) as aligned, zero-copy [`RelArray`] buffers.
2524
#[pyclass]
2525
pub struct RelView {
2526
    neighbors: Arc<[u32]>,
2527
    cols: Vec<(String, RelArrayData)>,
2528
}
2529

2530
#[pymethods]
2531
impl RelView {
2532
    fn __len__(&self) -> usize {
2533
        self.neighbors.len()
2534
    }
2535

2536
    #[getter]
2537
    fn neighbors(&self) -> RelArray {
2538
        RelArray::new(RelArrayData::U32(self.neighbors.clone()))
2539
    }
2540

2541
    /// The aligned values for property `key` as a [`RelArray`], or `None` if `key`
2542
    /// was not requested.
2543
    fn col(&self, key: &str) -> Option<RelArray> {
2544
        self.cols
2545
            .iter()
2546
            .find(|(k, _)| k == key)
2547
            .map(|(_, d)| RelArray::new(d.clone_data()))
2548
    }
2549

2550
    fn __repr__(&self) -> String {
2551
        format!(
2552
            "RelView(len={}, cols={})",
2553
            self.neighbors.len(),
2554
            self.cols.len()
2555
        )
2556
    }
2557
}
2558

2559
/// One group dimension for the Python-side aggregation spec: a raw `i64` column,
2560
/// or a column bucketed by ascending `bounds`.
2561
#[derive(Clone)]
2562
enum GroupSpec {
2563
    Col(String),
2564
    Bin(String, Vec<i64>),
2565
}
2566

2567
/// Build the core [`rustychickpeas_core::Aggregation`] from a Python-side spec.
2568
/// All the scan/parallelism lives in core; this just translates the spec.
2569
#[allow(clippy::too_many_arguments)]
2570
fn build_core_agg<'a>(
2571
    snapshot: &'a CoreGraphSnapshot,
2572
    labels: &[String],
2573
    where_filters: &[(String, String, i64)],
2574
    having_filters: &[(String, String, i64)],
2575
    by_label: bool,
2576
    group: &[GroupSpec],
2577
    sum_col: Option<&str>,
2578
    through: Option<(&str, rustychickpeas_core::types::Direction)>,
2579
    neighbor_filter: Option<&[u32]>,
2580
    projected_filters: &[(Vec<u32>, String, Vec<ValueId>)],
2581
) -> PyResult<rustychickpeas_core::Aggregation<'a>> {
2582
    let op = |s: &str| {
×
2583
        AggOp::parse(s).map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))
×
2584
    };
2585
    let mut agg = snapshot.aggregate(labels.iter().cloned());
×
2586
    for (c, o, v) in where_filters {
×
2587
        agg = agg.filter(c.clone(), op(o)?, *v);
×
2588
    }
2589
    for (proj, col, allowed) in projected_filters {
×
2590
        agg = agg.filter_via(proj, col.clone(), allowed.iter().cloned());
×
2591
    }
2592
    for (c, o, v) in having_filters {
×
2593
        agg = agg.having(c.clone(), op(o)?, *v);
×
2594
    }
2595
    if by_label {
×
2596
        agg = agg.by_label();
×
2597
    }
2598
    for gs in group {
×
2599
        agg = match gs {
×
2600
            GroupSpec::Col(c) => agg.by(c.clone()),
×
2601
            GroupSpec::Bin(c, e) => agg.bin(c.clone(), e.clone()),
×
2602
        };
2603
    }
2604
    if let Some(c) = sum_col {
×
2605
        agg = agg.sum(c);
×
2606
    }
2607
    if let Some((rt, dir)) = through {
×
2608
        agg = agg.through(rt, dir);
×
2609
    }
2610
    if let Some(ids) = neighbor_filter {
×
2611
        agg = agg.only_neighbors(ids.iter().copied());
×
2612
    }
2613
    Ok(agg)
×
2614
}
2615

2616
/// Lazy neighbor-grouping query (see `GraphSnapshot.neighbor_groups`). Immutable:
2617
/// `.project(...)` returns a new builder; `.sizes()` / `.top_by_size(...)` run it
2618
/// in parallel with the GIL released.
2619
#[pyclass]
2620
#[derive(Clone)]
2621
pub struct NeighborGroups {
2622
    snapshot: Arc<CoreGraphSnapshot>,
2623
    sources: Vec<u32>,
2624
    rel_type: String,
2625
    direction: rustychickpeas_core::types::Direction,
2626
    project: Vec<(rustychickpeas_core::types::Direction, String)>,
2627
}
2628

2629
#[pymethods]
2630
impl NeighborGroups {
2631
    /// Project each neighbor to its group node via a `follow`-style list of
2632
    /// `(direction, rel_type)` steps. Returns a new builder.
2633
    fn project(&self, steps: Vec<(Direction, String)>) -> NeighborGroups {
2634
        NeighborGroups {
2635
            snapshot: self.snapshot.clone(),
2636
            sources: self.sources.clone(),
2637
            rel_type: self.rel_type.clone(),
2638
            direction: self.direction,
2639
            project: steps.into_iter().map(|(d, r)| (d.into(), r)).collect(),
2640
        }
2641
    }
2642

2643
    /// Per source, the size of its largest cohort: `(source, size)`.
2644
    fn sizes(&self, py: Python<'_>) -> Vec<(u32, u32)> {
2645
        let snapshot = self.snapshot.clone();
2646
        let sources = self.sources.clone();
2647
        let rel_type = self.rel_type.clone();
2648
        let direction = self.direction;
2649
        let project = self.project.clone();
2650
        py.allow_threads(move || {
2651
            let steps: Vec<(rustychickpeas_core::types::Direction, &str)> =
2652
                project.iter().map(|(d, r)| (*d, r.as_str())).collect();
2653
            snapshot
2654
                .neighbor_groups(&sources, rel_type.as_str(), direction)
2655
                .project(&steps)
2656
                .sizes()
2657
        })
2658
    }
2659

2660
    /// The top `n` sources by largest cohort size: `(source, size)`, size
2661
    /// descending. Ties break by the `tie` node property (read as i64, ascending)
2662
    /// when given — so the order can match a query's output-id ordering — else by
2663
    /// source id ascending.
2664
    #[pyo3(signature = (n, tie=None))]
2665
    fn top_by_size(&self, py: Python<'_>, n: usize, tie: Option<String>) -> Vec<(u32, u32)> {
2666
        let snapshot = self.snapshot.clone();
2667
        let sources = self.sources.clone();
2668
        let rel_type = self.rel_type.clone();
2669
        let direction = self.direction;
2670
        let project = self.project.clone();
2671
        py.allow_threads(move || {
2672
            let steps: Vec<(rustychickpeas_core::types::Direction, &str)> =
2673
                project.iter().map(|(d, r)| (*d, r.as_str())).collect();
2674
            snapshot
2675
                .neighbor_groups(&sources, rel_type.as_str(), direction)
2676
                .project(&steps)
2677
                .top_by_size(n, tie.as_deref())
2678
        })
2679
    }
2680
}
2681

2682
/// Fluent aggregation builder (immutable: each step returns a new builder), created
2683
/// by [`GraphSnapshot::aggregate`]. `.run()` executes the scan and returns an
2684
/// [`AggResult`].
2685
#[pyclass(name = "Aggregation")]
2686
#[derive(Clone)]
2687
pub struct Aggregation {
2688
    snapshot: Arc<CoreGraphSnapshot>,
2689
    labels: Vec<String>,
2690
    where_filters: Vec<(String, String, i64)>,
2691
    having_filters: Vec<(String, String, i64)>,
2692
    by_label: bool,
2693
    group: Vec<GroupSpec>,
2694
    sum_col: Option<String>,
2695
    through: Option<(String, rustychickpeas_core::types::Direction)>,
2696
    neighbor_filter: Option<Vec<u32>>,
2697
    /// `(projection, column, allowed value ids)` projected-property population filters.
2698
    projected_filters: Vec<(Vec<u32>, String, Vec<ValueId>)>,
2699
}
2700

2701
#[pymethods]
2702
impl Aggregation {
2703
    fn __repr__(&self) -> String {
2704
        format!(
2705
            "Aggregation(labels={:?}, where={}, having={}, group_dims={}, sum={:?})",
2706
            self.labels,
2707
            self.where_filters.len(),
2708
            self.having_filters.len(),
2709
            self.by_label as usize + self.group.len(),
2710
            self.sum_col,
2711
        )
2712
    }
2713

2714
    /// Population predicate `column op value` (op ∈ `<,<=,>,>=,==,!=`); rows passing
2715
    /// all of these count toward `total`.
2716
    #[pyo3(name = "where")]
2717
    fn where_(&self, column: String, op: String, value: i64) -> Aggregation {
2718
        let mut a = self.clone();
2719
        a.where_filters.push((column, op, value));
2720
        a
2721
    }
2722

2723
    /// Population predicate on a *projected* node: keep a source whose projected
2724
    /// node (`projection[source]`, e.g. a `roots_via` array mapping a message to its
2725
    /// thread root) has `column` in `values`. Any value type (membership test, not
2726
    /// the i64 comparison of `where`); strings not interned in this snapshot can't
2727
    /// match and are dropped. Applied with the `where` filters.
2728
    fn where_via(
2729
        &self,
2730
        projection: &NodeArray,
2731
        column: String,
2732
        values: Vec<Bound<'_, PyAny>>,
2733
    ) -> PyResult<Aggregation> {
2734
        use rustychickpeas_core::PropertyValue;
2735
        let mut allowed: Vec<ValueId> = Vec::with_capacity(values.len());
2736
        for v in &values {
2737
            let vid = match py_to_property_value(v)? {
2738
                PropertyValue::String(s) => self.snapshot.atoms.get_id(&s).map(ValueId::Str),
2739
                PropertyValue::Integer(i) => Some(ValueId::I64(i)),
2740
                PropertyValue::Float(f) => Some(ValueId::from_f64(f)),
2741
                PropertyValue::Boolean(b) => Some(ValueId::Bool(b)),
2742
                PropertyValue::InternedString(_) => {
2743
                    return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
2744
                        "InternedString not supported here",
2745
                    ))
2746
                }
2747
            };
2748
            if let Some(vid) = vid {
2749
                allowed.push(vid);
2750
            }
2751
        }
2752
        let mut a = self.clone();
2753
        a.projected_filters
2754
            .push((projection.inner.to_vec(), column, allowed));
2755
        Ok(a)
2756
    }
2757

2758
    /// Extra predicate applied to grouped rows only (after the population filters).
2759
    fn having(&self, column: String, op: String, value: i64) -> Aggregation {
2760
        let mut a = self.clone();
2761
        a.having_filters.push((column, op, value));
2762
        a
2763
    }
2764

2765
    /// Group by the source node label (returned in rows as its name).
2766
    fn by_label(&self) -> Aggregation {
2767
        let mut a = self.clone();
2768
        a.by_label = true;
2769
        a
2770
    }
2771

2772
    /// Group by a dense `i64` column's value.
2773
    fn by(&self, column: String) -> Aggregation {
2774
        let mut a = self.clone();
2775
        a.group.push(GroupSpec::Col(column));
2776
        a
2777
    }
2778

2779
    /// Group by a column bucketed at ascending `bounds` (bucket = count of
2780
    /// `bounds <= value`); the row field is `"{column}_bin"`.
2781
    fn bin(&self, column: String, bounds: Vec<i64>) -> Aggregation {
2782
        let mut a = self.clone();
2783
        a.group.push(GroupSpec::Bin(column, bounds));
2784
        a
2785
    }
2786

2787
    /// Also sum this `i64` column per group (row field `"sum"`).
2788
    fn sum(&self, column: String) -> Aggregation {
2789
        let mut a = self.clone();
2790
        a.sum_col = Some(column);
2791
        a
2792
    }
2793

2794
    /// Count rels of `rel_type`/`direction` out of each source node instead of
2795
    /// counting nodes, grouping additionally by the neighbor id (row field
2796
    /// `"neighbor"`). `total` still counts source nodes.
2797
    fn through(&self, rel_type: String, direction: Direction) -> Aggregation {
2798
        let mut a = self.clone();
2799
        a.through = Some((rel_type, direction.into()));
2800
        a
2801
    }
2802

2803
    /// With `through`, count only neighbors whose node id is in `node_ids` (others
2804
    /// skipped), so `.rows` has just those neighbors.
2805
    fn only_neighbors(&self, node_ids: Vec<u32>) -> Aggregation {
2806
        let mut a = self.clone();
2807
        a.neighbor_filter = Some(node_ids);
2808
        a
2809
    }
2810

2811
    /// Execute: returns an [`AggResult`] with `.total` and self-describing dict
2812
    /// `.rows` (keys: the group fields, then `"count"` and `"sum"` if requested).
2813
    fn run(&self, py: Python<'_>) -> PyResult<AggResult> {
2814
        let agg = build_core_agg(
2815
            &self.snapshot,
2816
            &self.labels,
2817
            &self.where_filters,
2818
            &self.having_filters,
2819
            self.by_label,
2820
            &self.group,
2821
            self.sum_col.as_deref(),
2822
            self.through.as_ref().map(|(rt, dir)| (rt.as_str(), *dir)),
2823
            self.neighbor_filter.as_deref(),
2824
            &self.projected_filters,
2825
        )?;
2826
        // The parallel scan lives in core; release the GIL while it runs.
2827
        let res = py
2828
            .allow_threads(|| agg.run())
2829
            .map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?;
2830
        let has_sum = self.sum_col.is_some();
2831

2832
        let rows = pyo3::types::PyList::empty(py);
2833
        for row in &res.rows {
2834
            let d = pyo3::types::PyDict::new(py);
2835
            for (pos, (field, val)) in res.fields.iter().zip(row.key.iter()).enumerate() {
2836
                if self.by_label && pos == 0 {
2837
                    // The label key is its index into `labels`; emit the name.
2838
                    d.set_item(field, &self.labels[*val as usize])?;
2839
                } else {
2840
                    d.set_item(field, *val)?;
2841
                }
2842
            }
2843
            d.set_item("count", row.count)?;
2844
            if has_sum {
2845
                d.set_item("sum", row.sum)?;
2846
            }
2847
            rows.append(d)?;
2848
        }
2849
        Ok(AggResult {
2850
            total: res.total,
2851
            rows: rows.into_any().unbind(),
2852
        })
2853
    }
2854
}
2855

2856
/// Result of [`Aggregation::run`]: `total` (population count) and `rows`
2857
/// (a list of self-describing dicts).
2858
#[pyclass(name = "AggResult")]
2859
pub struct AggResult {
2860
    #[pyo3(get)]
2861
    total: u64,
2862
    #[pyo3(get)]
2863
    rows: PyObject,
2864
}
2865

2866
#[pymethods]
2867
impl AggResult {
2868
    fn __repr__(&self, py: Python<'_>) -> PyResult<String> {
2869
        let n = self.rows.bind(py).len()?;
2870
        Ok(format!(
2871
            "AggResult(total={}, rows={} groups)",
2872
            self.total, n
2873
        ))
2874
    }
2875
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc