• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5006008836

pending completion
5006008836

push

github

GitHub
Merge #785 #787

936 of 936 new or added lines in 50 files covered. (100.0%)

96010 of 107707 relevant lines covered (89.14%)

72676.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.02
/operators/src/source/csv.rs
1
use std::path::PathBuf;
2
use std::pin::Pin;
3
use std::sync::{Arc, Mutex};
4
use std::{fs::File, sync::atomic::AtomicBool};
5

6
use csv::{Position, Reader, StringRecord};
7
use futures::stream::BoxStream;
8
use futures::task::{Context, Poll};
9
use futures::{Stream, StreamExt};
10
use geoengine_datatypes::dataset::DataId;
11
use geoengine_datatypes::primitives::VectorQueryRectangle;
12
use serde::{Deserialize, Serialize};
13
use snafu::{ensure, OptionExt, ResultExt};
14

15
use geoengine_datatypes::collections::{
16
    BuilderProvider, GeoFeatureCollectionRowBuilder, MultiPointCollection, VectorDataType,
17
};
18
use geoengine_datatypes::{
19
    primitives::{BoundingBox2D, Coordinate2D, TimeInterval},
20
    spatial_reference::SpatialReference,
21
};
22

23
use crate::engine::{
24
    CanonicOperatorName, InitializedVectorOperator, OperatorData, OperatorName, QueryContext,
25
    SourceOperator, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
26
    VectorResultDescriptor,
27
};
28
use crate::engine::{QueryProcessor, WorkflowOperatorPath};
29
use crate::error;
30
use crate::util::Result;
31
use async_trait::async_trait;
32
use std::sync::atomic::Ordering;
33

34
/// Parameters for the CSV Source Operator
35
///
36
/// # Examples
37
///
38
/// ```rust
39
/// use serde_json::{Result, Value};
40
/// use geoengine_operators::source::{CsvSourceParameters, CsvSource};
41
/// use geoengine_operators::source::{CsvGeometrySpecification, CsvTimeSpecification};
42
///
43
/// let json_string = r#"
44
///     {
45
///         "type": "CsvSource",
46
///         "params": {
47
///             "filePath": "/foo/bar.csv",
48
///             "fieldSeparator": ",",
49
///             "geometry": {
50
///                 "type": "xy",
51
///                 "x": "x",
52
///                 "y": "y"
53
///             }
54
///         }
55
///     }"#;
56
///
57
/// let operator: CsvSource = serde_json::from_str(json_string).unwrap();
58
///
59
/// assert_eq!(operator, CsvSource {
60
///     params: CsvSourceParameters {
61
///         file_path: "/foo/bar.csv".into(),
62
///         field_separator: ',',
63
///         geometry: CsvGeometrySpecification::XY { x: "x".into(), y: "y".into() },
64
///         time: CsvTimeSpecification::None,
65
///     },
66
/// });
67
/// ```
68
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
34✔
69
#[serde(rename_all = "camelCase")]
70
pub struct CsvSourceParameters {
71
    pub file_path: PathBuf,
72
    pub field_separator: char,
73
    pub geometry: CsvGeometrySpecification,
74
    #[serde(default)]
75
    pub time: CsvTimeSpecification,
76
}
77

78
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
49✔
79
#[serde(tag = "type", rename_all = "lowercase")]
80
pub enum CsvGeometrySpecification {
81
    #[allow(clippy::upper_case_acronyms)]
82
    XY { x: String, y: String },
83
}
84

85
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
34✔
86
pub enum CsvTimeSpecification {
87
    None,
88
}
89

90
impl Default for CsvTimeSpecification {
91
    fn default() -> Self {
×
92
        Self::None
×
93
    }
×
94
}
95

96
enum ReaderState {
97
    Untouched(Reader<File>),
98
    OnGoing {
99
        header: ParsedHeader,
100
        records: csv::StringRecordsIntoIter<File>,
101
    },
102
    Error,
103
}
104

105
impl ReaderState {
106
    pub fn setup_once(&mut self, geometry_specification: CsvGeometrySpecification) -> Result<()> {
15✔
107
        if let ReaderState::Untouched(..) = self {
15✔
108
            // pass
6✔
109
        } else {
6✔
110
            return Ok(());
9✔
111
        }
112

113
        let old_state = std::mem::replace(self, ReaderState::Error);
6✔
114

115
        if let ReaderState::Untouched(mut csv_reader) = old_state {
6✔
116
            let header = match CsvSourceStream::setup_read(geometry_specification, &mut csv_reader)
6✔
117
            {
118
                Ok(header) => header,
5✔
119
                Err(error) => return Err(error),
1✔
120
            };
121

122
            let mut records = csv_reader.into_records();
5✔
123

5✔
124
            // consume the first row, which is the header
5✔
125
            if header.has_header {
5✔
126
                // TODO: throw error
5✔
127
                records.next();
5✔
128
            }
5✔
129

130
            *self = ReaderState::OnGoing { header, records }
5✔
131
        }
×
132

133
        Ok(())
5✔
134
    }
15✔
135
}
136

137
pub struct CsvSourceStream {
138
    parameters: CsvSourceParameters,
139
    bbox: BoundingBox2D,
140
    chunk_size: usize,
141
    reader_state: Arc<Mutex<ReaderState>>,
142
    thread_is_computing: Arc<AtomicBool>,
143
    #[allow(clippy::option_option)]
144
    poll_result: Arc<Mutex<Option<Option<Result<MultiPointCollection>>>>>,
145
}
146

147
pub type CsvSource = SourceOperator<CsvSourceParameters>;
148

149
impl OperatorName for CsvSource {
150
    const TYPE_NAME: &'static str = "CsvSource";
151
}
152

153
impl OperatorData for CsvSourceParameters {
154
    fn data_ids_collect(&self, _data_ids: &mut Vec<DataId>) {}
×
155
}
156

157
#[typetag::serde]
1✔
158
#[async_trait]
159
impl VectorOperator for CsvSource {
160
    async fn _initialize(
3✔
161
        self: Box<Self>,
3✔
162
        _path: WorkflowOperatorPath,
3✔
163
        _context: &dyn crate::engine::ExecutionContext,
3✔
164
    ) -> Result<Box<dyn InitializedVectorOperator>> {
3✔
165
        let initialized_source = InitializedCsvSource {
3✔
166
            name: CanonicOperatorName::from(&self),
3✔
167
            result_descriptor: VectorResultDescriptor {
3✔
168
                data_type: VectorDataType::MultiPoint, // TODO: get as user input
3✔
169
                spatial_reference: SpatialReference::epsg_4326().into(), // TODO: get as user input
3✔
170
                columns: Default::default(), // TODO: get when source allows loading other columns
3✔
171
                time: None,
3✔
172
                bbox: None,
3✔
173
            },
3✔
174
            state: self.params,
3✔
175
        };
3✔
176

3✔
177
        Ok(initialized_source.boxed())
3✔
178
    }
3✔
179

180
    span_fn!(CsvSource);
×
181
}
182

183
pub struct InitializedCsvSource {
184
    name: CanonicOperatorName,
185
    result_descriptor: VectorResultDescriptor,
186
    state: CsvSourceParameters,
187
}
188

189
impl InitializedVectorOperator for InitializedCsvSource {
190
    fn query_processor(&self) -> Result<crate::engine::TypedVectorQueryProcessor> {
2✔
191
        Ok(TypedVectorQueryProcessor::MultiPoint(
2✔
192
            CsvSourceProcessor {
2✔
193
                params: self.state.clone(),
2✔
194
            }
2✔
195
            .boxed(),
2✔
196
        ))
2✔
197
    }
2✔
198

199
    fn result_descriptor(&self) -> &VectorResultDescriptor {
3✔
200
        &self.result_descriptor
3✔
201
    }
3✔
202

203
    fn canonic_name(&self) -> CanonicOperatorName {
×
204
        self.name.clone()
×
205
    }
×
206
}
207

208
impl CsvSourceStream {
209
    /// Creates a new `CsvSource`
210
    ///
211
    /// # Errors
212
    ///
213
    /// This constructor fails if the delimiter is not an ASCII character.
214
    /// Furthermore, there are IO errors from the reader.
215
    ///
216
    // TODO: include time interval, e.g. QueryRectangle parameter
217
    pub fn new(
6✔
218
        parameters: CsvSourceParameters,
6✔
219
        bbox: BoundingBox2D,
6✔
220
        chunk_size: usize,
6✔
221
    ) -> Result<Self> {
6✔
222
        ensure!(
6✔
223
            parameters.field_separator.is_ascii(),
6✔
224
            error::CsvSource {
×
225
                details: "Delimiter must be ASCII character"
×
226
            }
×
227
        );
228

229
        Ok(Self {
230
            reader_state: Arc::new(Mutex::new(ReaderState::Untouched(
231
                csv::ReaderBuilder::new()
6✔
232
                    .delimiter(parameters.field_separator as u8)
6✔
233
                    .has_headers(true)
6✔
234
                    .from_path(parameters.file_path.as_path())
6✔
235
                    .context(error::CsvSourceReader {})?,
6✔
236
            ))),
237
            thread_is_computing: Arc::new(AtomicBool::new(false)),
6✔
238
            poll_result: Arc::new(Mutex::new(None)),
6✔
239
            parameters,
6✔
240
            bbox,
6✔
241
            chunk_size,
6✔
242
        })
243
    }
6✔
244

245
    fn setup_read(
246
        geometry_specification: CsvGeometrySpecification,
247
        csv_reader: &mut Reader<File>,
248
    ) -> Result<ParsedHeader> {
249
        csv_reader
6✔
250
            .seek(Position::new())
6✔
251
            .context(error::CsvSourceReader {})?; // start at beginning
6✔
252

253
        ensure!(
6✔
254
            csv_reader.has_headers(),
6✔
255
            error::CsvSource {
×
256
                details: "CSV file must contain header",
×
257
            }
×
258
        );
259

260
        let header = csv_reader.headers().context(error::CsvSourceReader)?;
6✔
261

262
        let CsvGeometrySpecification::XY { x, y } = geometry_specification;
6✔
263
        let x_index = header
6✔
264
            .iter()
6✔
265
            .position(|v| v == x)
6✔
266
            .context(error::CsvSource {
6✔
267
                details: "Cannot find x index in csv header",
6✔
268
            })?;
6✔
269
        let y_index = header
6✔
270
            .iter()
6✔
271
            .position(|v| v == y)
12✔
272
            .context(error::CsvSource {
6✔
273
                details: "Cannot find y index in csv header",
6✔
274
            })?;
6✔
275

276
        Ok(ParsedHeader {
5✔
277
            has_header: true,
5✔
278
            x_index,
5✔
279
            y_index,
5✔
280
        })
5✔
281
    }
6✔
282

283
    /// Parse a single CSV row
284
    fn parse_row(header: &ParsedHeader, row: &StringRecord) -> Result<ParsedRow> {
14✔
285
        let x: f64 = row
14✔
286
            .get(header.x_index)
14✔
287
            .context(error::CsvSource {
14✔
288
                details: "Cannot find x index key",
14✔
289
            })?
14✔
290
            .parse()
14✔
291
            .map_err(|_error| error::Error::CsvSource {
14✔
292
                details: "Cannot parse x coordinate".to_string(),
×
293
            })?;
14✔
294
        let y: f64 = row
14✔
295
            .get(header.y_index)
14✔
296
            .context(error::CsvSource {
14✔
297
                details: "Cannot find y index key",
14✔
298
            })?
14✔
299
            .parse()
14✔
300
            .map_err(|_error| error::Error::CsvSource {
14✔
301
                details: "Cannot parse y coordinate".to_string(),
×
302
            })?;
14✔
303

304
        Ok(ParsedRow {
14✔
305
            coordinate: (x, y).into(),
14✔
306
            time_interval: TimeInterval::default(),
14✔
307
        })
14✔
308
    }
14✔
309
}
310

311
impl Stream for CsvSourceStream {
312
    type Item = Result<MultiPointCollection>;
313

314
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
30✔
315
        // TODO: handle lock poisoning on multiple occasions
30✔
316

30✔
317
        if self.thread_is_computing.load(Ordering::Relaxed) {
30✔
318
            return Poll::Pending;
×
319
        }
30✔
320

30✔
321
        let mut poll_result = self.poll_result.lock().unwrap();
30✔
322
        if poll_result.is_some() {
30✔
323
            let x = poll_result.take().unwrap();
15✔
324
            return Poll::Ready(x);
15✔
325
        }
15✔
326

15✔
327
        self.thread_is_computing.store(true, Ordering::Relaxed);
15✔
328

15✔
329
        let is_working = self.thread_is_computing.clone();
15✔
330
        let reader_state = self.reader_state.clone();
15✔
331
        let poll_result = self.poll_result.clone();
15✔
332

15✔
333
        let bbox = self.bbox;
15✔
334
        let chunk_size = self.chunk_size;
15✔
335
        let parameters = self.parameters.clone();
15✔
336
        let waker = cx.waker().clone();
15✔
337

15✔
338
        crate::util::spawn_blocking(move || {
15✔
339
            let mut csv_reader = reader_state.lock().unwrap();
15✔
340
            let computation_result = || -> Result<Option<MultiPointCollection>> {
15✔
341
                // TODO: is clone necessary?
15✔
342
                let geometry_specification = parameters.geometry.clone();
15✔
343
                csv_reader.setup_once(geometry_specification)?;
15✔
344

345
                let (header, records) = match &mut *csv_reader {
14✔
346
                    ReaderState::OnGoing { header, records } => (header, records),
13✔
347
                    ReaderState::Error => return Ok(None),
1✔
348
                    ReaderState::Untouched(_) => unreachable!(),
×
349
                };
350

351
                let mut builder = MultiPointCollection::builder().finish_header();
13✔
352
                let mut number_of_entries = 0; // TODO: add size/len to builder
13✔
353

354
                while number_of_entries < chunk_size {
27✔
355
                    let Some(record) = records.next() else { break; };
24✔
356

357
                    let row = record.context(error::CsvSourceReader)?;
15✔
358
                    let parsed_row = CsvSourceStream::parse_row(header, &row)?;
14✔
359

360
                    // TODO: filter time
361
                    if bbox.contains_coordinate(&parsed_row.coordinate) {
14✔
362
                        builder.push_geometry(parsed_row.coordinate.into());
13✔
363
                        builder.push_time_interval(parsed_row.time_interval);
13✔
364
                        builder.finish_row();
13✔
365

13✔
366
                        number_of_entries += 1;
13✔
367
                    }
13✔
368
                }
369

370
                // TODO: is this the correct cancellation criterion?
371
                if number_of_entries > 0 {
12✔
372
                    let collection = builder.build()?;
7✔
373
                    Ok(Some(collection))
7✔
374
                } else {
375
                    Ok(None)
5✔
376
                }
377
            }();
15✔
378

379
            *poll_result.lock().unwrap() = Some(match computation_result {
15✔
380
                Ok(Some(collection)) => Some(Ok(collection)),
7✔
381
                Ok(None) => None,
6✔
382
                Err(e) => Some(Err(e)),
2✔
383
            });
384
            is_working.store(false, Ordering::Relaxed);
15✔
385

15✔
386
            waker.wake();
15✔
387
        });
15✔
388

15✔
389
        Poll::Pending
15✔
390
    }
30✔
391
}
392

393
#[derive(Debug)]
×
394
struct CsvSourceProcessor {
395
    params: CsvSourceParameters,
396
}
397

398
#[async_trait]
399
impl QueryProcessor for CsvSourceProcessor {
400
    type Output = MultiPointCollection;
401
    type SpatialBounds = BoundingBox2D;
402

403
    async fn _query<'a>(
3✔
404
        &'a self,
3✔
405
        query: VectorQueryRectangle,
3✔
406
        _ctx: &'a dyn QueryContext,
3✔
407
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
3✔
408
        // TODO: properly handle chunk_size
409
        Ok(CsvSourceStream::new(self.params.clone(), query.spatial_bounds, 10)?.boxed())
3✔
410
    }
6✔
411
}
412

413
#[derive(Clone, Copy, Debug)]
×
414
struct ParsedHeader {
415
    pub has_header: bool,
416
    pub x_index: usize,
417
    pub y_index: usize,
418
}
419

420
struct ParsedRow {
421
    pub coordinate: Coordinate2D,
422
    pub time_interval: TimeInterval,
423
    // TODO: fields
424
}
425

426
#[cfg(test)]
427
mod tests {
428
    use std::io::{Seek, SeekFrom, Write};
429

430
    use geoengine_datatypes::primitives::SpatialResolution;
431

432
    use super::*;
433
    use crate::engine::MockQueryContext;
434
    use geoengine_datatypes::collections::{FeatureCollectionInfos, ToGeoJson};
435

436
    #[tokio::test]
1✔
437
    async fn read_points() {
1✔
438
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
439
        write!(
1✔
440
            fake_file,
1✔
441
            "\
1✔
442
x,y
1✔
443
0,1
1✔
444
2,3
1✔
445
4,5
1✔
446
"
1✔
447
        )
1✔
448
        .unwrap();
1✔
449
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
450

1✔
451
        let mut csv_source = CsvSourceStream::new(
1✔
452
            CsvSourceParameters {
1✔
453
                file_path: fake_file.path().into(),
1✔
454
                field_separator: ',',
1✔
455
                geometry: CsvGeometrySpecification::XY {
1✔
456
                    x: "x".into(),
1✔
457
                    y: "y".into(),
1✔
458
                },
1✔
459
                time: CsvTimeSpecification::None,
1✔
460
            },
1✔
461
            BoundingBox2D::new_unchecked((0., 0.).into(), (5., 5.).into()),
1✔
462
            2,
1✔
463
        )
1✔
464
        .unwrap();
1✔
465

466
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 2);
1✔
467
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 1);
1✔
468
        assert!(csv_source.next().await.is_none());
1✔
469
    }
470

471
    #[tokio::test]
1✔
472
    async fn erroneous_point_rows() {
1✔
473
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
474
        write!(
1✔
475
            fake_file,
1✔
476
            "\
1✔
477
x,y
1✔
478
0,1
1✔
479
CORRUPT
1✔
480
4,5
1✔
481
"
1✔
482
        )
1✔
483
        .unwrap();
1✔
484
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
485

1✔
486
        let mut csv_source = CsvSourceStream::new(
1✔
487
            CsvSourceParameters {
1✔
488
                file_path: fake_file.path().into(),
1✔
489
                field_separator: ',',
1✔
490
                geometry: CsvGeometrySpecification::XY {
1✔
491
                    x: "x".into(),
1✔
492
                    y: "y".into(),
1✔
493
                },
1✔
494
                time: CsvTimeSpecification::None,
1✔
495
            },
1✔
496
            BoundingBox2D::new_unchecked((0., 0.).into(), (5., 5.).into()),
1✔
497
            1,
1✔
498
        )
1✔
499
        .unwrap();
1✔
500

501
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 1);
1✔
502
        assert!(csv_source.next().await.unwrap().is_err());
1✔
503
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 1);
1✔
504
        assert!(csv_source.next().await.is_none());
1✔
505
    }
506

507
    #[tokio::test]
1✔
508
    async fn corrupt_point_header() {
1✔
509
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
510
        write!(
1✔
511
            fake_file,
1✔
512
            "\
1✔
513
x,z
1✔
514
0,1
1✔
515
2,3
1✔
516
4,5
1✔
517
"
1✔
518
        )
1✔
519
        .unwrap();
1✔
520
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
521

1✔
522
        let mut csv_source = CsvSourceStream::new(
1✔
523
            CsvSourceParameters {
1✔
524
                file_path: fake_file.path().into(),
1✔
525
                field_separator: ',',
1✔
526
                geometry: CsvGeometrySpecification::XY {
1✔
527
                    x: "x".into(),
1✔
528
                    y: "y".into(),
1✔
529
                },
1✔
530
                time: CsvTimeSpecification::None,
1✔
531
            },
1✔
532
            BoundingBox2D::new_unchecked((0., 0.).into(), (5., 5.).into()),
1✔
533
            1,
1✔
534
        )
1✔
535
        .unwrap();
1✔
536

537
        assert!(csv_source.next().await.unwrap().is_err());
1✔
538
        assert!(csv_source.next().await.is_none());
1✔
539
    }
540

541
    #[tokio::test]
1✔
542
    async fn processor() {
1✔
543
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
544
        write!(
1✔
545
            fake_file,
1✔
546
            "\
1✔
547
x,y
1✔
548
0,1
1✔
549
2,3
1✔
550
4,5
1✔
551
"
1✔
552
        )
1✔
553
        .unwrap();
1✔
554
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
555

1✔
556
        let params = CsvSourceParameters {
1✔
557
            file_path: fake_file.path().into(),
1✔
558
            field_separator: ',',
1✔
559
            geometry: CsvGeometrySpecification::XY {
1✔
560
                x: "x".into(),
1✔
561
                y: "y".into(),
1✔
562
            },
1✔
563
            time: CsvTimeSpecification::None,
1✔
564
        };
1✔
565

1✔
566
        let p = CsvSourceProcessor { params };
1✔
567

1✔
568
        let query = VectorQueryRectangle {
1✔
569
            spatial_bounds: BoundingBox2D::new_unchecked(
1✔
570
                Coordinate2D::new(0., 0.),
1✔
571
                Coordinate2D::new(3., 3.),
1✔
572
            ),
1✔
573
            time_interval: TimeInterval::new_unchecked(0, 1),
1✔
574
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
575
        };
1✔
576
        let ctx = MockQueryContext::new((10 * 8 * 2).into());
1✔
577

578
        let r: Vec<Result<MultiPointCollection>> =
1✔
579
            p.query(query, &ctx).await.unwrap().collect().await;
2✔
580

581
        assert_eq!(r.len(), 1);
1✔
582

583
        assert_eq!(
1✔
584
            r[0].as_ref().unwrap().to_geo_json(),
1✔
585
            serde_json::json!({
1✔
586
                "type": "FeatureCollection",
1✔
587
                "features": [{
1✔
588
                    "type": "Feature",
1✔
589
                    "geometry": {
1✔
590
                        "type": "Point",
1✔
591
                        "coordinates": [0.0, 1.0]
1✔
592
                    },
1✔
593
                    "properties": {},
1✔
594
                    "when": {
1✔
595
                        "start": "-262144-01-01T00:00:00+00:00",
1✔
596
                        "end": "+262143-12-31T23:59:59.999+00:00",
1✔
597
                        "type": "Interval"
1✔
598
                    }
1✔
599
                }, {
1✔
600
                    "type": "Feature",
1✔
601
                    "geometry": {
1✔
602
                        "type": "Point",
1✔
603
                        "coordinates": [2.0, 3.0]
1✔
604
                    },
1✔
605
                    "properties": {},
1✔
606
                    "when": {
1✔
607
                        "start": "-262144-01-01T00:00:00+00:00",
1✔
608
                        "end": "+262143-12-31T23:59:59.999+00:00",
1✔
609
                        "type": "Interval"
1✔
610
                    }
1✔
611
                }]
1✔
612
            })
1✔
613
            .to_string()
1✔
614
        );
1✔
615
    }
616

617
    #[test]
1✔
618
    fn operator() {
1✔
619
        let mut temp_file = tempfile::NamedTempFile::new().unwrap();
1✔
620
        write!(
1✔
621
            temp_file,
1✔
622
            "\
1✔
623
x;y
1✔
624
0;1
1✔
625
2;3
1✔
626
4;5
1✔
627
"
1✔
628
        )
1✔
629
        .unwrap();
1✔
630
        temp_file.seek(SeekFrom::Start(0)).unwrap();
1✔
631

1✔
632
        let params = CsvSourceParameters {
1✔
633
            file_path: temp_file.path().into(),
1✔
634
            field_separator: ';',
1✔
635
            geometry: CsvGeometrySpecification::XY {
1✔
636
                x: "x".into(),
1✔
637
                y: "y".into(),
1✔
638
            },
1✔
639
            time: CsvTimeSpecification::None,
1✔
640
        };
1✔
641

1✔
642
        let operator = CsvSource { params }.boxed();
1✔
643

1✔
644
        let operator_json = serde_json::to_value(&operator).unwrap();
1✔
645

1✔
646
        assert_eq!(
1✔
647
            operator_json,
1✔
648
            serde_json::json!({
1✔
649
                "type": "CsvSource",
1✔
650
                "params": {
1✔
651
                    "filePath": temp_file.path(),
1✔
652
                    "fieldSeparator": ";",
1✔
653
                    "geometry": {
1✔
654
                        "type": "xy",
1✔
655
                        "x": "x",
1✔
656
                        "y": "y"
1✔
657
                    },
1✔
658
                    "time": "None"
1✔
659
                }
1✔
660
            })
1✔
661
        );
1✔
662

663
        let _operator: Box<dyn VectorOperator> = serde_json::from_value(operator_json).unwrap();
1✔
664
    }
1✔
665
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc