• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16331938722

16 Jul 2025 10:49PM UTC coverage: 80.702% (-0.9%) from 81.557%
16331938722

push

github

web-flow
feat: build with stable rust (#3881)

120 of 173 new or added lines in 28 files covered. (69.36%)

174 existing lines in 102 files now uncovered.

41861 of 51871 relevant lines covered (80.7%)

157487.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/vortex-datafusion/src/persistent/sink.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::sync::Arc;
6
use std::sync::atomic::{AtomicU64, Ordering};
7

8
use async_trait::async_trait;
9
use datafusion::arrow::datatypes::SchemaRef;
10
use datafusion::common::DataFusionError;
11
use datafusion::common::runtime::SpawnedTask;
12
use datafusion::datasource::file_format::write::demux::DemuxedStreamReceiver;
13
use datafusion::datasource::physical_plan::{FileSink, FileSinkConfig};
14
use datafusion::datasource::sink::DataSink;
15
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
16
use datafusion::physical_plan::metrics::MetricsSet;
17
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
18
use futures::StreamExt;
19
use object_store::ObjectStore;
20
use tokio_stream::wrappers::ReceiverStream;
21
use vortex::ArrayRef;
22
use vortex::arrow::FromArrowArray;
23
use vortex::dtype::DType;
24
use vortex::dtype::arrow::FromArrowType;
25
use vortex::error::VortexResult;
26
use vortex::file::VortexWriteOptions;
27
use vortex::stream::ArrayStreamAdapter;
28

29
pub struct VortexSink {
30
    config: FileSinkConfig,
31
    schema: SchemaRef,
32
}
33

34
impl VortexSink {
35
    pub fn new(config: FileSinkConfig, schema: SchemaRef) -> Self {
2✔
36
        Self { config, schema }
2✔
37
    }
2✔
38
}
39

40
impl std::fmt::Debug for VortexSink {
41
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
42
        f.debug_struct("VortexSink").finish()
×
43
    }
×
44
}
45

46
impl DisplayAs for VortexSink {
47
    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
48
        match t {
×
49
            DisplayFormatType::Default
50
            | DisplayFormatType::Verbose
51
            | DisplayFormatType::TreeRender => {
52
                write!(f, "VortexSink")
×
53
            }
54
        }
55
    }
×
56
}
57

58
#[async_trait]
59
impl DataSink for VortexSink {
60
    fn as_any(&self) -> &dyn Any {
×
61
        self
×
62
    }
×
63

64
    fn metrics(&self) -> Option<MetricsSet> {
×
65
        None
×
66
    }
×
67

68
    /// Returns the sink schema
69
    fn schema(&self) -> &SchemaRef {
2✔
70
        &self.schema
2✔
71
    }
2✔
72

73
    async fn write_all(
74
        &self,
75
        data: SendableRecordBatchStream,
76
        context: &Arc<TaskContext>,
77
    ) -> datafusion::common::Result<u64> {
4✔
78
        FileSink::write_all(self, data, context).await
2✔
79
    }
4✔
80
}
81

82
#[async_trait]
83
impl FileSink for VortexSink {
84
    fn config(&self) -> &FileSinkConfig {
2✔
85
        &self.config
2✔
86
    }
2✔
87

88
    async fn spawn_writer_tasks_and_join(
89
        &self,
90
        _context: &Arc<TaskContext>,
91
        demux_task: SpawnedTask<datafusion::common::Result<()>>,
92
        mut file_stream_rx: DemuxedStreamReceiver,
93
        object_store: Arc<dyn ObjectStore>,
94
    ) -> datafusion::common::Result<u64> {
4✔
95
        // This is a hack
96
        let row_counter = Arc::new(AtomicU64::new(0));
2✔
97

98
        // TODO(adamg):
99
        // 1. We only write only file at a time
100
        // 2. We can probably be better at signaling how much memory we're consuming (potentially when reading too), see ParquetSink::spawn_writer_tasks_and_join.
101
        while let Some((path, rx)) = file_stream_rx.recv().await {
4✔
102
            let row_counter = row_counter.clone();
2✔
103
            let stream = ReceiverStream::new(rx).map(move |rb| {
2✔
104
                row_counter.fetch_add(rb.num_rows() as u64, Ordering::Relaxed);
2✔
105
                VortexResult::Ok(ArrayRef::from_arrow(rb, false))
2✔
106
            });
2✔
107
            let dtype = DType::from_arrow(self.config.output_schema.as_ref());
2✔
108
            let stream_adapter = ArrayStreamAdapter::new(dtype, stream);
2✔
109

110
            VortexWriteOptions::default()
2✔
111
                .write_object_store(&object_store, &path, stream_adapter)
2✔
112
                .await
2✔
113
                .map_err(|e| {
2✔
114
                    DataFusionError::Execution(format!("Failed to write Vortex file: {e}"))
×
UNCOV
115
                })?;
×
116
        }
117

118
        demux_task
2✔
119
            .join_unwind()
2✔
120
            .await
2✔
121
            .map_err(DataFusionError::ExecutionJoin)??;
2✔
122

123
        Ok(row_counter.load(Ordering::SeqCst))
2✔
124
    }
4✔
125
}
126

127
#[cfg(test)]
128
mod tests {
129
    use std::sync::Arc;
130

131
    use datafusion::datasource::DefaultTableSource;
132
    use datafusion::execution::SessionStateBuilder;
133
    use datafusion::logical_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Values};
134
    use datafusion::prelude::SessionContext;
135
    use datafusion::scalar::ScalarValue;
136
    use tempfile::TempDir;
137

138
    use crate::persistent::{VortexFormatFactory, register_vortex_format_factory};
139

140
    #[tokio::test]
141
    async fn test_insert_into() {
1✔
142
        let dir = TempDir::new().unwrap();
1✔
143

144
        let factory = VortexFormatFactory::default();
1✔
145
        let mut session_state_builder = SessionStateBuilder::new().with_default_features();
1✔
146
        register_vortex_format_factory(factory, &mut session_state_builder);
1✔
147
        let session = SessionContext::new_with_state(session_state_builder.build());
1✔
148

149
        session
1✔
150
            .sql(&format!(
1✔
151
                "CREATE EXTERNAL TABLE my_tbl \
1✔
152
                    (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
1✔
153
                STORED AS vortex 
1✔
154
                LOCATION '{}/';",
1✔
155
                dir.path().to_str().unwrap()
1✔
156
            ))
1✔
157
            .await
1✔
158
            .unwrap();
1✔
159

160
        let my_tbl = session.table("my_tbl").await.unwrap();
1✔
161

162
        // It's valuable to have two insert code paths because they actually behave slightly differently
163
        let values = Values {
1✔
164
            schema: Arc::new(my_tbl.schema().clone()),
1✔
165
            values: vec![vec![
1✔
166
                Expr::Literal(ScalarValue::new_utf8view("hello"), None),
1✔
167
                Expr::Literal(42_i32.into(), None),
1✔
168
            ]],
1✔
169
        };
1✔
170

171
        let tbl_provider = session.table_provider("my_tbl").await.unwrap();
1✔
172

173
        let logical_plan = LogicalPlanBuilder::insert_into(
1✔
174
            LogicalPlan::Values(values.clone()),
1✔
175
            "my_tbl",
176
            Arc::new(DefaultTableSource::new(tbl_provider)),
1✔
177
            datafusion::logical_expr::dml::InsertOp::Append,
1✔
178
        )
179
        .unwrap()
1✔
180
        .build()
1✔
181
        .unwrap();
1✔
182

183
        session
1✔
184
            .execute_logical_plan(logical_plan)
1✔
185
            .await
1✔
186
            .unwrap()
1✔
187
            .collect()
1✔
188
            .await
1✔
189
            .unwrap();
1✔
190

191
        session
1✔
192
            .sql("INSERT INTO my_tbl VALUES ('world', 24);")
1✔
193
            .await
1✔
194
            .unwrap()
1✔
195
            .collect()
1✔
196
            .await
1✔
197
            .unwrap();
1✔
198

199
        my_tbl.clone().show().await.unwrap();
1✔
200

201
        assert_eq!(
1✔
202
            session
1✔
203
                .table("my_tbl")
1✔
204
                .await
1✔
205
                .unwrap()
1✔
206
                .count()
1✔
207
                .await
1✔
208
                .unwrap(),
1✔
209
            2
1✔
210
        );
1✔
211
    }
1✔
212
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc