• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 3929938005

pending completion
3929938005

push

github

GitHub
Merge #713

84930 of 96741 relevant lines covered (87.79%)

79640.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.68
/operators/src/source/csv.rs
1
use std::path::PathBuf;
2
use std::pin::Pin;
3
use std::sync::{Arc, Mutex};
4
use std::{fs::File, sync::atomic::AtomicBool};
5

6
use csv::{Position, Reader, StringRecord};
7
use futures::stream::BoxStream;
8
use futures::task::{Context, Poll};
9
use futures::{Stream, StreamExt};
10
use geoengine_datatypes::dataset::DataId;
11
use geoengine_datatypes::primitives::VectorQueryRectangle;
12
use serde::{Deserialize, Serialize};
13
use snafu::{ensure, OptionExt, ResultExt};
14
use tracing::{span, Level};
15

16
use geoengine_datatypes::collections::{
17
    BuilderProvider, GeoFeatureCollectionRowBuilder, MultiPointCollection, VectorDataType,
18
};
19
use geoengine_datatypes::{
20
    primitives::{BoundingBox2D, Coordinate2D, TimeInterval},
21
    spatial_reference::SpatialReference,
22
};
23

24
use crate::engine::{CreateSpan, QueryProcessor};
25
use crate::engine::{
26
    InitializedVectorOperator, OperatorData, OperatorName, QueryContext, SourceOperator,
27
    TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor, VectorResultDescriptor,
28
};
29
use crate::error;
30
use crate::util::Result;
31
use async_trait::async_trait;
32
use std::sync::atomic::Ordering;
33

34
/// Parameters for the CSV Source Operator
35
///
36
/// # Examples
37
///
38
/// ```rust
39
/// use serde_json::{Result, Value};
40
/// use geoengine_operators::source::{CsvSourceParameters, CsvSource};
41
/// use geoengine_operators::source::{CsvGeometrySpecification, CsvTimeSpecification};
42
///
43
/// let json_string = r#"
44
///     {
45
///         "type": "CsvSource",
46
///         "params": {
47
///             "filePath": "/foo/bar.csv",
48
///             "fieldSeparator": ",",
49
///             "geometry": {
50
///                 "type": "xy",
51
///                 "x": "x",
52
///                 "y": "y"
53
///             }
54
///         }
55
///     }"#;
56
///
57
/// let operator: CsvSource = serde_json::from_str(json_string).unwrap();
58
///
59
/// assert_eq!(operator, CsvSource {
60
///     params: CsvSourceParameters {
61
///         file_path: "/foo/bar.csv".into(),
62
///         field_separator: ',',
63
///         geometry: CsvGeometrySpecification::XY { x: "x".into(), y: "y".into() },
64
///         time: CsvTimeSpecification::None,
65
///     },
66
/// });
67
/// ```
68
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
34✔
69
#[serde(rename_all = "camelCase")]
70
pub struct CsvSourceParameters {
71
    pub file_path: PathBuf,
72
    pub field_separator: char,
73
    pub geometry: CsvGeometrySpecification,
74
    #[serde(default)]
75
    pub time: CsvTimeSpecification,
76
}
77

78
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
49✔
79
#[serde(tag = "type", rename_all = "lowercase")]
80
pub enum CsvGeometrySpecification {
81
    #[allow(clippy::upper_case_acronyms)]
82
    XY { x: String, y: String },
83
}
84

85
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
34✔
86
pub enum CsvTimeSpecification {
87
    None,
88
}
89

90
impl Default for CsvTimeSpecification {
91
    fn default() -> Self {
×
92
        Self::None
×
93
    }
×
94
}
95

96
enum ReaderState {
97
    Untouched(Reader<File>),
98
    OnGoing {
99
        header: ParsedHeader,
100
        records: csv::StringRecordsIntoIter<File>,
101
    },
102
    Error,
103
}
104

105
impl ReaderState {
106
    pub fn setup_once(&mut self, geometry_specification: CsvGeometrySpecification) -> Result<()> {
15✔
107
        if let ReaderState::Untouched(..) = self {
15✔
108
            // pass
6✔
109
        } else {
6✔
110
            return Ok(());
9✔
111
        }
112

113
        let old_state = std::mem::replace(self, ReaderState::Error);
6✔
114

115
        if let ReaderState::Untouched(mut csv_reader) = old_state {
6✔
116
            let header = match CsvSourceStream::setup_read(geometry_specification, &mut csv_reader)
6✔
117
            {
118
                Ok(header) => header,
5✔
119
                Err(error) => return Err(error),
1✔
120
            };
121

122
            let mut records = csv_reader.into_records();
5✔
123

5✔
124
            // consume the first row, which is the header
5✔
125
            if header.has_header {
5✔
126
                // TODO: throw error
5✔
127
                records.next();
5✔
128
            }
5✔
129

130
            *self = ReaderState::OnGoing { header, records }
5✔
131
        }
×
132

133
        Ok(())
5✔
134
    }
15✔
135
}
136

137
pub struct CsvSourceStream {
138
    parameters: CsvSourceParameters,
139
    bbox: BoundingBox2D,
140
    chunk_size: usize,
141
    reader_state: Arc<Mutex<ReaderState>>,
142
    thread_is_computing: Arc<AtomicBool>,
143
    #[allow(clippy::option_option)]
144
    poll_result: Arc<Mutex<Option<Option<Result<MultiPointCollection>>>>>,
145
}
146

147
pub type CsvSource = SourceOperator<CsvSourceParameters>;
148

149
impl OperatorName for CsvSource {
150
    const TYPE_NAME: &'static str = "CsvSource";
151
}
152

153
impl OperatorData for CsvSourceParameters {
154
    fn data_ids_collect(&self, _data_ids: &mut Vec<DataId>) {}
×
155
}
156

157
#[typetag::serde]
1✔
158
#[async_trait]
159
impl VectorOperator for CsvSource {
160
    async fn _initialize(
3✔
161
        self: Box<Self>,
3✔
162
        _context: &dyn crate::engine::ExecutionContext,
3✔
163
    ) -> Result<Box<dyn InitializedVectorOperator>> {
3✔
164
        let initialized_source = InitializedCsvSource {
3✔
165
            result_descriptor: VectorResultDescriptor {
3✔
166
                data_type: VectorDataType::MultiPoint, // TODO: get as user input
3✔
167
                spatial_reference: SpatialReference::epsg_4326().into(), // TODO: get as user input
3✔
168
                columns: Default::default(), // TODO: get when source allows loading other columns
3✔
169
                time: None,
3✔
170
                bbox: None,
3✔
171
            },
3✔
172
            state: self.params,
3✔
173
        };
3✔
174

3✔
175
        Ok(initialized_source.boxed())
3✔
176
    }
3✔
177

178
    span_fn!(CsvSource);
×
179
}
180

181
pub struct InitializedCsvSource {
182
    result_descriptor: VectorResultDescriptor,
183
    state: CsvSourceParameters,
184
}
185

186
impl InitializedVectorOperator for InitializedCsvSource {
187
    fn query_processor(&self) -> Result<crate::engine::TypedVectorQueryProcessor> {
2✔
188
        Ok(TypedVectorQueryProcessor::MultiPoint(
2✔
189
            CsvSourceProcessor {
2✔
190
                params: self.state.clone(),
2✔
191
            }
2✔
192
            .boxed(),
2✔
193
        ))
2✔
194
    }
2✔
195

196
    fn result_descriptor(&self) -> &VectorResultDescriptor {
3✔
197
        &self.result_descriptor
3✔
198
    }
3✔
199
}
200

201
impl CsvSourceStream {
202
    /// Creates a new `CsvSource`
203
    ///
204
    /// # Errors
205
    ///
206
    /// This constructor fails if the delimiter is not an ASCII character.
207
    /// Furthermore, there are IO errors from the reader.
208
    ///
209
    // TODO: include time interval, e.g. QueryRectangle parameter
210
    pub fn new(
6✔
211
        parameters: CsvSourceParameters,
6✔
212
        bbox: BoundingBox2D,
6✔
213
        chunk_size: usize,
6✔
214
    ) -> Result<Self> {
6✔
215
        ensure!(
6✔
216
            parameters.field_separator.is_ascii(),
6✔
217
            error::CsvSource {
×
218
                details: "Delimiter must be ASCII character"
×
219
            }
×
220
        );
221

222
        Ok(Self {
223
            reader_state: Arc::new(Mutex::new(ReaderState::Untouched(
224
                csv::ReaderBuilder::new()
6✔
225
                    .delimiter(parameters.field_separator as u8)
6✔
226
                    .has_headers(true)
6✔
227
                    .from_path(parameters.file_path.as_path())
6✔
228
                    .context(error::CsvSourceReader {})?,
6✔
229
            ))),
230
            thread_is_computing: Arc::new(AtomicBool::new(false)),
6✔
231
            poll_result: Arc::new(Mutex::new(None)),
6✔
232
            parameters,
6✔
233
            bbox,
6✔
234
            chunk_size,
6✔
235
        })
236
    }
6✔
237

238
    fn setup_read(
239
        geometry_specification: CsvGeometrySpecification,
240
        csv_reader: &mut Reader<File>,
241
    ) -> Result<ParsedHeader> {
242
        csv_reader
6✔
243
            .seek(Position::new())
6✔
244
            .context(error::CsvSourceReader {})?; // start at beginning
6✔
245

246
        ensure!(
6✔
247
            csv_reader.has_headers(),
6✔
248
            error::CsvSource {
×
249
                details: "CSV file must contain header",
×
250
            }
×
251
        );
252

253
        let header = csv_reader.headers().context(error::CsvSourceReader)?;
6✔
254

255
        let CsvGeometrySpecification::XY { x, y } = geometry_specification;
6✔
256
        let x_index = header
6✔
257
            .iter()
6✔
258
            .position(|v| v == x)
6✔
259
            .context(error::CsvSource {
6✔
260
                details: "Cannot find x index in csv header",
6✔
261
            })?;
6✔
262
        let y_index = header
6✔
263
            .iter()
6✔
264
            .position(|v| v == y)
12✔
265
            .context(error::CsvSource {
6✔
266
                details: "Cannot find y index in csv header",
6✔
267
            })?;
6✔
268

269
        Ok(ParsedHeader {
5✔
270
            has_header: true,
5✔
271
            x_index,
5✔
272
            y_index,
5✔
273
        })
5✔
274
    }
6✔
275

276
    /// Parse a single CSV row
277
    fn parse_row(header: &ParsedHeader, row: &StringRecord) -> Result<ParsedRow> {
14✔
278
        let x: f64 = row
14✔
279
            .get(header.x_index)
14✔
280
            .context(error::CsvSource {
14✔
281
                details: "Cannot find x index key",
14✔
282
            })?
14✔
283
            .parse()
14✔
284
            .map_err(|_error| error::Error::CsvSource {
14✔
285
                details: "Cannot parse x coordinate".to_string(),
×
286
            })?;
14✔
287
        let y: f64 = row
14✔
288
            .get(header.y_index)
14✔
289
            .context(error::CsvSource {
14✔
290
                details: "Cannot find y index key",
14✔
291
            })?
14✔
292
            .parse()
14✔
293
            .map_err(|_error| error::Error::CsvSource {
14✔
294
                details: "Cannot parse y coordinate".to_string(),
×
295
            })?;
14✔
296

297
        Ok(ParsedRow {
14✔
298
            coordinate: (x, y).into(),
14✔
299
            time_interval: TimeInterval::default(),
14✔
300
        })
14✔
301
    }
14✔
302
}
303

304
impl Stream for CsvSourceStream {
305
    type Item = Result<MultiPointCollection>;
306

307
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
30✔
308
        // TODO: handle lock poisoning on multiple occasions
30✔
309

30✔
310
        if self.thread_is_computing.load(Ordering::Relaxed) {
30✔
311
            return Poll::Pending;
×
312
        }
30✔
313

30✔
314
        let mut poll_result = self.poll_result.lock().unwrap();
30✔
315
        if poll_result.is_some() {
30✔
316
            let x = poll_result.take().unwrap();
15✔
317
            return Poll::Ready(x);
15✔
318
        }
15✔
319

15✔
320
        self.thread_is_computing.store(true, Ordering::Relaxed);
15✔
321

15✔
322
        let is_working = self.thread_is_computing.clone();
15✔
323
        let reader_state = self.reader_state.clone();
15✔
324
        let poll_result = self.poll_result.clone();
15✔
325

15✔
326
        let bbox = self.bbox;
15✔
327
        let chunk_size = self.chunk_size;
15✔
328
        let parameters = self.parameters.clone();
15✔
329
        let waker = cx.waker().clone();
15✔
330

15✔
331
        crate::util::spawn_blocking(move || {
15✔
332
            let mut csv_reader = reader_state.lock().unwrap();
15✔
333
            let computation_result = || -> Result<Option<MultiPointCollection>> {
15✔
334
                // TODO: is clone necessary?
15✔
335
                let geometry_specification = parameters.geometry.clone();
15✔
336
                csv_reader.setup_once(geometry_specification)?;
15✔
337

338
                let (header, records) = match &mut *csv_reader {
14✔
339
                    ReaderState::OnGoing { header, records } => (header, records),
13✔
340
                    ReaderState::Error => return Ok(None),
1✔
341
                    ReaderState::Untouched(_) => unreachable!(),
×
342
                };
343

344
                let mut builder = MultiPointCollection::builder().finish_header();
13✔
345
                let mut number_of_entries = 0; // TODO: add size/len to builder
13✔
346

347
                while number_of_entries < chunk_size {
27✔
348
                    let Some(record) = records.next() else { break; };
24✔
349

350
                    let row = record.context(error::CsvSourceReader)?;
15✔
351
                    let parsed_row = CsvSourceStream::parse_row(header, &row)?;
14✔
352

353
                    // TODO: filter time
354
                    if bbox.contains_coordinate(&parsed_row.coordinate) {
14✔
355
                        builder.push_geometry(parsed_row.coordinate.into());
13✔
356
                        builder.push_time_interval(parsed_row.time_interval);
13✔
357
                        builder.finish_row();
13✔
358

13✔
359
                        number_of_entries += 1;
13✔
360
                    }
13✔
361
                }
362

363
                // TODO: is this the correct cancellation criterion?
364
                if number_of_entries > 0 {
12✔
365
                    let collection = builder.build()?;
7✔
366
                    Ok(Some(collection))
7✔
367
                } else {
368
                    Ok(None)
5✔
369
                }
370
            }();
15✔
371

372
            *poll_result.lock().unwrap() = Some(match computation_result {
15✔
373
                Ok(Some(collection)) => Some(Ok(collection)),
7✔
374
                Ok(None) => None,
6✔
375
                Err(e) => Some(Err(e)),
2✔
376
            });
377
            is_working.store(false, Ordering::Relaxed);
15✔
378

15✔
379
            waker.wake();
15✔
380
        });
15✔
381

15✔
382
        Poll::Pending
15✔
383
    }
30✔
384
}
385

386
#[derive(Debug)]
×
387
struct CsvSourceProcessor {
388
    params: CsvSourceParameters,
389
}
390

391
#[async_trait]
392
impl QueryProcessor for CsvSourceProcessor {
393
    type Output = MultiPointCollection;
394
    type SpatialBounds = BoundingBox2D;
395

396
    async fn _query<'a>(
3✔
397
        &'a self,
3✔
398
        query: VectorQueryRectangle,
3✔
399
        _ctx: &'a dyn QueryContext,
3✔
400
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
3✔
401
        // TODO: properly handle chunk_size
402
        Ok(CsvSourceStream::new(self.params.clone(), query.spatial_bounds, 10)?.boxed())
3✔
403
    }
6✔
404
}
405

406
#[derive(Clone, Copy, Debug)]
×
407
struct ParsedHeader {
408
    pub has_header: bool,
409
    pub x_index: usize,
410
    pub y_index: usize,
411
}
412

413
struct ParsedRow {
414
    pub coordinate: Coordinate2D,
415
    pub time_interval: TimeInterval,
416
    // TODO: fields
417
}
418

419
#[cfg(test)]
420
mod tests {
421
    use std::io::{Seek, SeekFrom, Write};
422

423
    use geoengine_datatypes::primitives::SpatialResolution;
424

425
    use super::*;
426
    use crate::engine::MockQueryContext;
427
    use geoengine_datatypes::collections::{FeatureCollectionInfos, ToGeoJson};
428

429
    #[tokio::test]
1✔
430
    async fn read_points() {
1✔
431
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
432
        write!(
1✔
433
            fake_file,
1✔
434
            "\
1✔
435
x,y
1✔
436
0,1
1✔
437
2,3
1✔
438
4,5
1✔
439
"
1✔
440
        )
1✔
441
        .unwrap();
1✔
442
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
443

1✔
444
        let mut csv_source = CsvSourceStream::new(
1✔
445
            CsvSourceParameters {
1✔
446
                file_path: fake_file.path().into(),
1✔
447
                field_separator: ',',
1✔
448
                geometry: CsvGeometrySpecification::XY {
1✔
449
                    x: "x".into(),
1✔
450
                    y: "y".into(),
1✔
451
                },
1✔
452
                time: CsvTimeSpecification::None,
1✔
453
            },
1✔
454
            BoundingBox2D::new_unchecked((0., 0.).into(), (5., 5.).into()),
1✔
455
            2,
1✔
456
        )
1✔
457
        .unwrap();
1✔
458

459
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 2);
1✔
460
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 1);
1✔
461
        assert!(csv_source.next().await.is_none());
1✔
462
    }
463

464
    #[tokio::test]
1✔
465
    async fn erroneous_point_rows() {
1✔
466
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
467
        write!(
1✔
468
            fake_file,
1✔
469
            "\
1✔
470
x,y
1✔
471
0,1
1✔
472
CORRUPT
1✔
473
4,5
1✔
474
"
1✔
475
        )
1✔
476
        .unwrap();
1✔
477
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
478

1✔
479
        let mut csv_source = CsvSourceStream::new(
1✔
480
            CsvSourceParameters {
1✔
481
                file_path: fake_file.path().into(),
1✔
482
                field_separator: ',',
1✔
483
                geometry: CsvGeometrySpecification::XY {
1✔
484
                    x: "x".into(),
1✔
485
                    y: "y".into(),
1✔
486
                },
1✔
487
                time: CsvTimeSpecification::None,
1✔
488
            },
1✔
489
            BoundingBox2D::new_unchecked((0., 0.).into(), (5., 5.).into()),
1✔
490
            1,
1✔
491
        )
1✔
492
        .unwrap();
1✔
493

494
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 1);
1✔
495
        assert!(csv_source.next().await.unwrap().is_err());
1✔
496
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 1);
1✔
497
        assert!(csv_source.next().await.is_none());
1✔
498
    }
499

500
    #[tokio::test]
1✔
501
    async fn corrupt_point_header() {
1✔
502
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
503
        write!(
1✔
504
            fake_file,
1✔
505
            "\
1✔
506
x,z
1✔
507
0,1
1✔
508
2,3
1✔
509
4,5
1✔
510
"
1✔
511
        )
1✔
512
        .unwrap();
1✔
513
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
514

1✔
515
        let mut csv_source = CsvSourceStream::new(
1✔
516
            CsvSourceParameters {
1✔
517
                file_path: fake_file.path().into(),
1✔
518
                field_separator: ',',
1✔
519
                geometry: CsvGeometrySpecification::XY {
1✔
520
                    x: "x".into(),
1✔
521
                    y: "y".into(),
1✔
522
                },
1✔
523
                time: CsvTimeSpecification::None,
1✔
524
            },
1✔
525
            BoundingBox2D::new_unchecked((0., 0.).into(), (5., 5.).into()),
1✔
526
            1,
1✔
527
        )
1✔
528
        .unwrap();
1✔
529

530
        assert!(csv_source.next().await.unwrap().is_err());
1✔
531
        assert!(csv_source.next().await.is_none());
1✔
532
    }
533

534
    #[tokio::test]
1✔
535
    async fn processor() {
1✔
536
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
537
        write!(
1✔
538
            fake_file,
1✔
539
            "\
1✔
540
x,y
1✔
541
0,1
1✔
542
2,3
1✔
543
4,5
1✔
544
"
1✔
545
        )
1✔
546
        .unwrap();
1✔
547
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
548

1✔
549
        let params = CsvSourceParameters {
1✔
550
            file_path: fake_file.path().into(),
1✔
551
            field_separator: ',',
1✔
552
            geometry: CsvGeometrySpecification::XY {
1✔
553
                x: "x".into(),
1✔
554
                y: "y".into(),
1✔
555
            },
1✔
556
            time: CsvTimeSpecification::None,
1✔
557
        };
1✔
558

1✔
559
        let p = CsvSourceProcessor { params };
1✔
560

1✔
561
        let query = VectorQueryRectangle {
1✔
562
            spatial_bounds: BoundingBox2D::new_unchecked(
1✔
563
                Coordinate2D::new(0., 0.),
1✔
564
                Coordinate2D::new(3., 3.),
1✔
565
            ),
1✔
566
            time_interval: TimeInterval::new_unchecked(0, 1),
1✔
567
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
568
        };
1✔
569
        let ctx = MockQueryContext::new((10 * 8 * 2).into());
1✔
570

571
        let r: Vec<Result<MultiPointCollection>> =
1✔
572
            p.query(query, &ctx).await.unwrap().collect().await;
2✔
573

574
        assert_eq!(r.len(), 1);
1✔
575

576
        assert_eq!(
1✔
577
            r[0].as_ref().unwrap().to_geo_json(),
1✔
578
            serde_json::json!({
1✔
579
                "type": "FeatureCollection",
1✔
580
                "features": [{
1✔
581
                    "type": "Feature",
1✔
582
                    "geometry": {
1✔
583
                        "type": "Point",
1✔
584
                        "coordinates": [0.0, 1.0]
1✔
585
                    },
1✔
586
                    "properties": {},
1✔
587
                    "when": {
1✔
588
                        "start": "-262144-01-01T00:00:00+00:00",
1✔
589
                        "end": "+262143-12-31T23:59:59.999+00:00",
1✔
590
                        "type": "Interval"
1✔
591
                    }
1✔
592
                }, {
1✔
593
                    "type": "Feature",
1✔
594
                    "geometry": {
1✔
595
                        "type": "Point",
1✔
596
                        "coordinates": [2.0, 3.0]
1✔
597
                    },
1✔
598
                    "properties": {},
1✔
599
                    "when": {
1✔
600
                        "start": "-262144-01-01T00:00:00+00:00",
1✔
601
                        "end": "+262143-12-31T23:59:59.999+00:00",
1✔
602
                        "type": "Interval"
1✔
603
                    }
1✔
604
                }]
1✔
605
            })
1✔
606
            .to_string()
1✔
607
        );
1✔
608
    }
609

610
    #[test]
1✔
611
    fn operator() {
1✔
612
        let mut temp_file = tempfile::NamedTempFile::new().unwrap();
1✔
613
        write!(
1✔
614
            temp_file,
1✔
615
            "\
1✔
616
x;y
1✔
617
0;1
1✔
618
2;3
1✔
619
4;5
1✔
620
"
1✔
621
        )
1✔
622
        .unwrap();
1✔
623
        temp_file.seek(SeekFrom::Start(0)).unwrap();
1✔
624

1✔
625
        let params = CsvSourceParameters {
1✔
626
            file_path: temp_file.path().into(),
1✔
627
            field_separator: ';',
1✔
628
            geometry: CsvGeometrySpecification::XY {
1✔
629
                x: "x".into(),
1✔
630
                y: "y".into(),
1✔
631
            },
1✔
632
            time: CsvTimeSpecification::None,
1✔
633
        };
1✔
634

1✔
635
        let operator = CsvSource { params }.boxed();
1✔
636

1✔
637
        let operator_json = serde_json::to_value(&operator).unwrap();
1✔
638

1✔
639
        assert_eq!(
1✔
640
            operator_json,
1✔
641
            serde_json::json!({
1✔
642
                "type": "CsvSource",
1✔
643
                "params": {
1✔
644
                    "filePath": temp_file.path(),
1✔
645
                    "fieldSeparator": ";",
1✔
646
                    "geometry": {
1✔
647
                        "type": "xy",
1✔
648
                        "x": "x",
1✔
649
                        "y": "y"
1✔
650
                    },
1✔
651
                    "time": "None"
1✔
652
                }
1✔
653
            })
1✔
654
        );
1✔
655

656
        let _operator: Box<dyn VectorOperator> = serde_json::from_value(operator_json).unwrap();
1✔
657
    }
1✔
658
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc