• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 16167706152

09 Jul 2025 11:08AM UTC coverage: 88.738% (-1.0%) from 89.762%
16167706152

push

github

web-flow
refactor: Updates-2025-07-02 (#1062)

* rust 1.88

* clippy auto-fixes

* manual clippy fixes

* update deps

* cargo update

* update onnx

* cargo fmt

* update sqlfluff

121 of 142 new or added lines in 29 files covered. (85.21%)

300 existing lines in 88 files now uncovered.

111259 of 125379 relevant lines covered (88.74%)

77910.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.71
/operators/src/source/csv.rs
1
use std::path::PathBuf;
2
use std::pin::Pin;
3
use std::sync::{Arc, Mutex};
4
use std::{fs::File, sync::atomic::AtomicBool};
5

6
use csv::{Position, Reader, StringRecord};
7
use futures::stream::BoxStream;
8
use futures::task::{Context, Poll};
9
use futures::{Stream, StreamExt};
10
use geoengine_datatypes::dataset::NamedData;
11
use geoengine_datatypes::primitives::{ColumnSelection, VectorQueryRectangle};
12
use serde::{Deserialize, Serialize};
13
use snafu::{OptionExt, ResultExt, ensure};
14

15
use geoengine_datatypes::collections::{
16
    BuilderProvider, GeoFeatureCollectionRowBuilder, MultiPointCollection, VectorDataType,
17
};
18
use geoengine_datatypes::{
19
    primitives::{BoundingBox2D, Coordinate2D, TimeInterval},
20
    spatial_reference::SpatialReference,
21
};
22

23
use crate::engine::{
24
    CanonicOperatorName, InitializedVectorOperator, OperatorData, OperatorName, QueryContext,
25
    SourceOperator, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
26
    VectorResultDescriptor,
27
};
28
use crate::engine::{QueryProcessor, WorkflowOperatorPath};
29
use crate::error;
30
use crate::util::{Result, safe_lock_mutex};
31
use async_trait::async_trait;
32
use std::sync::atomic::Ordering;
33

34
/// Parameters for the CSV Source Operator
35
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
36
#[serde(rename_all = "camelCase")]
37
pub struct CsvSourceParameters {
38
    pub file_path: PathBuf,
39
    pub field_separator: char,
40
    pub geometry: CsvGeometrySpecification,
41
    #[serde(default)]
42
    pub time: CsvTimeSpecification,
43
}
44

45
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
46
#[serde(tag = "type", rename_all = "lowercase")]
47
pub enum CsvGeometrySpecification {
48
    #[allow(clippy::upper_case_acronyms)]
49
    XY { x: String, y: String },
50
}
51

52
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
53
pub enum CsvTimeSpecification {
54
    None,
55
}
56

57
impl Default for CsvTimeSpecification {
58
    fn default() -> Self {
1✔
59
        Self::None
1✔
60
    }
1✔
61
}
62

63
enum ReaderState {
64
    Untouched(Reader<File>),
65
    OnGoing {
66
        header: ParsedHeader,
67
        records: csv::StringRecordsIntoIter<File>,
68
    },
69
    Error,
70
}
71

72
impl ReaderState {
73
    pub fn setup_once(&mut self, geometry_specification: CsvGeometrySpecification) -> Result<()> {
17✔
74
        if let ReaderState::Untouched(..) = self {
17✔
75
            // pass
7✔
76
        } else {
7✔
77
            return Ok(());
10✔
78
        }
79

80
        let old_state = std::mem::replace(self, ReaderState::Error);
7✔
81

82
        if let ReaderState::Untouched(mut csv_reader) = old_state {
7✔
83
            let header = CsvSourceStream::setup_read(geometry_specification, &mut csv_reader)?;
7✔
84

85
            let mut records = csv_reader.into_records();
6✔
86

87
            // consume the first row, which is the header
88
            if header.has_header {
6✔
89
                // TODO: throw error
6✔
90
                records.next();
6✔
91
            }
6✔
92

93
            *self = ReaderState::OnGoing { header, records }
6✔
94
        }
×
95

96
        Ok(())
6✔
97
    }
17✔
98
}
99

100
pub struct CsvSourceStream {
101
    parameters: CsvSourceParameters,
102
    bbox: BoundingBox2D,
103
    chunk_size: usize,
104
    reader_state: Arc<Mutex<ReaderState>>,
105
    thread_is_computing: Arc<AtomicBool>,
106
    #[allow(clippy::option_option)]
107
    poll_result: Arc<Mutex<Option<Option<Result<MultiPointCollection>>>>>,
108
}
109

110
pub type CsvSource = SourceOperator<CsvSourceParameters>;
111

112
impl OperatorName for CsvSource {
113
    const TYPE_NAME: &'static str = "CsvSource";
114
}
115

116
impl OperatorData for CsvSourceParameters {
117
    fn data_names_collect(&self, _data_names: &mut Vec<NamedData>) {}
×
118
}
119

UNCOV
120
#[typetag::serde]
×
121
#[async_trait]
122
impl VectorOperator for CsvSource {
123
    async fn _initialize(
124
        self: Box<Self>,
125
        path: WorkflowOperatorPath,
126
        _context: &dyn crate::engine::ExecutionContext,
127
    ) -> Result<Box<dyn InitializedVectorOperator>> {
8✔
128
        let initialized_source = InitializedCsvSource {
4✔
129
            name: CanonicOperatorName::from(&self),
4✔
130
            path,
4✔
131
            result_descriptor: VectorResultDescriptor {
4✔
132
                data_type: VectorDataType::MultiPoint, // TODO: get as user input
4✔
133
                spatial_reference: SpatialReference::epsg_4326().into(), // TODO: get as user input
4✔
134
                columns: Default::default(), // TODO: get when source allows loading other columns
4✔
135
                time: None,
4✔
136
                bbox: None,
4✔
137
            },
4✔
138
            state: self.params,
4✔
139
        };
4✔
140

141
        Ok(initialized_source.boxed())
4✔
142
    }
8✔
143

144
    span_fn!(CsvSource);
145
}
146

147
pub struct InitializedCsvSource {
148
    name: CanonicOperatorName,
149
    path: WorkflowOperatorPath,
150
    result_descriptor: VectorResultDescriptor,
151
    state: CsvSourceParameters,
152
}
153

154
impl InitializedVectorOperator for InitializedCsvSource {
155
    fn query_processor(&self) -> Result<crate::engine::TypedVectorQueryProcessor> {
3✔
156
        Ok(TypedVectorQueryProcessor::MultiPoint(
3✔
157
            CsvSourceProcessor {
3✔
158
                params: self.state.clone(),
3✔
159
                result_descriptor: self.result_descriptor.clone(),
3✔
160
            }
3✔
161
            .boxed(),
3✔
162
        ))
3✔
163
    }
3✔
164

165
    fn result_descriptor(&self) -> &VectorResultDescriptor {
4✔
166
        &self.result_descriptor
4✔
167
    }
4✔
168

169
    fn canonic_name(&self) -> CanonicOperatorName {
×
170
        self.name.clone()
×
171
    }
×
172

173
    fn name(&self) -> &'static str {
3✔
174
        CsvSource::TYPE_NAME
3✔
175
    }
3✔
176

177
    fn path(&self) -> WorkflowOperatorPath {
3✔
178
        self.path.clone()
3✔
179
    }
3✔
180
}
181

182
impl CsvSourceStream {
183
    /// Creates a new `CsvSource`
184
    ///
185
    /// # Errors
186
    ///
187
    /// This constructor fails if the delimiter is not an ASCII character.
188
    /// Furthermore, there are IO errors from the reader.
189
    ///
190
    // TODO: include time interval, e.g. QueryRectangle parameter
191
    pub fn new(
7✔
192
        parameters: CsvSourceParameters,
7✔
193
        bbox: BoundingBox2D,
7✔
194
        chunk_size: usize,
7✔
195
    ) -> Result<Self> {
7✔
196
        ensure!(
7✔
197
            parameters.field_separator.is_ascii(),
7✔
198
            error::CsvSource {
×
199
                details: "Delimiter must be ASCII character"
×
200
            }
×
201
        );
202

203
        Ok(Self {
204
            reader_state: Arc::new(Mutex::new(ReaderState::Untouched(
7✔
205
                csv::ReaderBuilder::new()
7✔
206
                    .delimiter(parameters.field_separator as u8)
7✔
207
                    .has_headers(true)
7✔
208
                    .from_path(parameters.file_path.as_path())
7✔
209
                    .context(error::CsvSourceReader {})?,
7✔
210
            ))),
211
            thread_is_computing: Arc::new(AtomicBool::new(false)),
7✔
212
            poll_result: Arc::new(Mutex::new(None)),
7✔
213
            parameters,
7✔
214
            bbox,
7✔
215
            chunk_size,
7✔
216
        })
217
    }
7✔
218

219
    fn setup_read(
7✔
220
        geometry_specification: CsvGeometrySpecification,
7✔
221
        csv_reader: &mut Reader<File>,
7✔
222
    ) -> Result<ParsedHeader> {
7✔
223
        csv_reader
7✔
224
            .seek(Position::new())
7✔
225
            .context(error::CsvSourceReader {})?; // start at beginning
7✔
226

227
        ensure!(
7✔
228
            csv_reader.has_headers(),
7✔
229
            error::CsvSource {
×
230
                details: "CSV file must contain header",
×
231
            }
×
232
        );
233

234
        let header = csv_reader.headers().context(error::CsvSourceReader)?;
7✔
235

236
        let CsvGeometrySpecification::XY { x, y } = geometry_specification;
7✔
237
        let x_index = header
7✔
238
            .iter()
7✔
239
            .position(|v| v == x)
7✔
240
            .context(error::CsvSource {
7✔
241
                details: "Cannot find x index in csv header",
7✔
242
            })?;
7✔
243
        let y_index = header
7✔
244
            .iter()
7✔
245
            .position(|v| v == y)
14✔
246
            .context(error::CsvSource {
7✔
247
                details: "Cannot find y index in csv header",
7✔
248
            })?;
7✔
249

250
        Ok(ParsedHeader {
6✔
251
            has_header: true,
6✔
252
            x_index,
6✔
253
            y_index,
6✔
254
        })
6✔
255
    }
7✔
256

257
    /// Parse a single CSV row
258
    fn parse_row(header: &ParsedHeader, row: &StringRecord) -> Result<ParsedRow> {
17✔
259
        let x: f64 = row
17✔
260
            .get(header.x_index)
17✔
261
            .context(error::CsvSource {
17✔
262
                details: "Cannot find x index key",
17✔
263
            })?
17✔
264
            .parse()
17✔
265
            .map_err(|_error| error::Error::CsvSource {
17✔
266
                details: "Cannot parse x coordinate".to_string(),
×
UNCOV
267
            })?;
×
268
        let y: f64 = row
17✔
269
            .get(header.y_index)
17✔
270
            .context(error::CsvSource {
17✔
271
                details: "Cannot find y index key",
17✔
272
            })?
17✔
273
            .parse()
17✔
274
            .map_err(|_error| error::Error::CsvSource {
17✔
275
                details: "Cannot parse y coordinate".to_string(),
×
UNCOV
276
            })?;
×
277

278
        Ok(ParsedRow {
17✔
279
            coordinate: (x, y).into(),
17✔
280
            time_interval: TimeInterval::default(),
17✔
281
        })
17✔
282
    }
17✔
283
}
284

285
impl Stream for CsvSourceStream {
286
    type Item = Result<MultiPointCollection>;
287

288
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34✔
289
        // TODO: handle lock poisoning on multiple occasions
290

291
        if self.thread_is_computing.load(Ordering::Relaxed) {
34✔
292
            return Poll::Pending;
×
293
        }
34✔
294

295
        let mut poll_result = safe_lock_mutex(&self.poll_result);
34✔
296
        if let Some(x) = poll_result.take() {
34✔
297
            return Poll::Ready(x);
17✔
298
        }
17✔
299

300
        self.thread_is_computing.store(true, Ordering::Relaxed);
17✔
301

302
        let is_working = self.thread_is_computing.clone();
17✔
303
        let reader_state = self.reader_state.clone();
17✔
304
        let poll_result = self.poll_result.clone();
17✔
305

306
        let bbox = self.bbox;
17✔
307
        let chunk_size = self.chunk_size;
17✔
308
        let parameters = self.parameters.clone();
17✔
309
        let waker = cx.waker().clone();
17✔
310

311
        crate::util::spawn_blocking(move || {
17✔
312
            let mut csv_reader = safe_lock_mutex(&reader_state);
17✔
313
            let computation_result = || -> Result<Option<MultiPointCollection>> {
17✔
314
                // TODO: is clone necessary?
315
                let geometry_specification = parameters.geometry.clone();
17✔
316
                csv_reader.setup_once(geometry_specification)?;
17✔
317

318
                let (header, records) = match &mut *csv_reader {
16✔
319
                    ReaderState::OnGoing { header, records } => (header, records),
15✔
320
                    ReaderState::Error => return Ok(None),
1✔
321
                    ReaderState::Untouched(_) => unreachable!(),
×
322
                };
323

324
                let mut builder = MultiPointCollection::builder().finish_header();
15✔
325
                let mut number_of_entries = 0; // TODO: add size/len to builder
15✔
326

327
                while number_of_entries < chunk_size {
32✔
328
                    let Some(record) = records.next() else {
29✔
329
                        break;
11✔
330
                    };
331

332
                    let row = record.context(error::CsvSourceReader)?;
18✔
333
                    let parsed_row = CsvSourceStream::parse_row(header, &row)?;
17✔
334

335
                    // TODO: filter time
336
                    if bbox.contains_coordinate(&parsed_row.coordinate) {
17✔
337
                        builder.push_geometry(parsed_row.coordinate.into());
16✔
338
                        builder.push_time_interval(parsed_row.time_interval);
16✔
339
                        builder.finish_row();
16✔
340

16✔
341
                        number_of_entries += 1;
16✔
342
                    }
16✔
343
                }
344

345
                // TODO: is this the correct cancellation criterion?
346
                if number_of_entries > 0 {
14✔
347
                    let collection = builder.build()?;
8✔
348
                    Ok(Some(collection))
8✔
349
                } else {
350
                    Ok(None)
6✔
351
                }
352
            }();
17✔
353

354
            *safe_lock_mutex(&poll_result) = Some(match computation_result {
17✔
355
                Ok(Some(collection)) => Some(Ok(collection)),
8✔
356
                Ok(None) => None,
7✔
357
                Err(e) => Some(Err(e)),
2✔
358
            });
359
            is_working.store(false, Ordering::Relaxed);
17✔
360

361
            waker.wake();
17✔
362
        });
17✔
363

364
        Poll::Pending
17✔
365
    }
34✔
366
}
367

368
#[derive(Debug)]
369
struct CsvSourceProcessor {
370
    params: CsvSourceParameters,
371
    result_descriptor: VectorResultDescriptor,
372
}
373

374
#[async_trait]
375
impl QueryProcessor for CsvSourceProcessor {
376
    type Output = MultiPointCollection;
377
    type SpatialBounds = BoundingBox2D;
378
    type Selection = ColumnSelection;
379
    type ResultDescription = VectorResultDescriptor;
380

381
    async fn _query<'a>(
382
        &'a self,
383
        query: VectorQueryRectangle,
384
        _ctx: &'a dyn QueryContext,
385
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
8✔
386
        // TODO: properly handle chunk_size
387
        Ok(CsvSourceStream::new(self.params.clone(), query.spatial_bounds, 10)?.boxed())
4✔
388
    }
8✔
389

390
    fn result_descriptor(&self) -> &VectorResultDescriptor {
13✔
391
        &self.result_descriptor
13✔
392
    }
13✔
393
}
394

395
#[derive(Clone, Copy, Debug)]
396
struct ParsedHeader {
397
    pub has_header: bool,
398
    pub x_index: usize,
399
    pub y_index: usize,
400
}
401

402
struct ParsedRow {
403
    pub coordinate: Coordinate2D,
404
    pub time_interval: TimeInterval,
405
    // TODO: fields
406
}
407

408
#[cfg(test)]
409
mod tests {
410
    use std::io::{Seek, SeekFrom, Write};
411

412
    use geoengine_datatypes::primitives::SpatialResolution;
413

414
    use super::*;
415
    use crate::engine::MockQueryContext;
416
    use geoengine_datatypes::collections::{FeatureCollectionInfos, ToGeoJson};
417

418
    #[test]
419
    fn it_deserializes() {
1✔
420
        let json_string = r#"
1✔
421
            {
1✔
422
                "type": "CsvSource",
1✔
423
                "params": {
1✔
424
                    "filePath": "/foo/bar.csv",
1✔
425
                    "fieldSeparator": ",",
1✔
426
                    "geometry": {
1✔
427
                        "type": "xy",
1✔
428
                        "x": "x",
1✔
429
                        "y": "y"
1✔
430
                    }
1✔
431
                }
1✔
432
            }"#;
1✔
433

434
        let operator: CsvSource = serde_json::from_str(json_string).unwrap();
1✔
435

436
        assert_eq!(
1✔
437
            operator,
438
            CsvSource {
1✔
439
                params: CsvSourceParameters {
1✔
440
                    file_path: "/foo/bar.csv".into(),
1✔
441
                    field_separator: ',',
1✔
442
                    geometry: CsvGeometrySpecification::XY {
1✔
443
                        x: "x".into(),
1✔
444
                        y: "y".into()
1✔
445
                    },
1✔
446
                    time: CsvTimeSpecification::None,
1✔
447
                },
1✔
448
            }
1✔
449
        );
450
    }
1✔
451

452
    #[tokio::test]
453
    async fn read_points() {
1✔
454
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
455
        write!(
1✔
456
            fake_file,
1✔
457
            "\
1✔
458
x,y
1✔
459
0,1
1✔
460
2,3
1✔
461
4,5
1✔
462
"
1✔
463
        )
464
        .unwrap();
1✔
465
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
466

467
        let mut csv_source = CsvSourceStream::new(
1✔
468
            CsvSourceParameters {
1✔
469
                file_path: fake_file.path().into(),
1✔
470
                field_separator: ',',
1✔
471
                geometry: CsvGeometrySpecification::XY {
1✔
472
                    x: "x".into(),
1✔
473
                    y: "y".into(),
1✔
474
                },
1✔
475
                time: CsvTimeSpecification::None,
1✔
476
            },
1✔
477
            BoundingBox2D::new_unchecked((0., 0.).into(), (5., 5.).into()),
1✔
478
            2,
479
        )
480
        .unwrap();
1✔
481

482
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 2);
1✔
483
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 1);
1✔
484
        assert!(csv_source.next().await.is_none());
1✔
485
    }
1✔
486

487
    #[tokio::test]
488
    async fn erroneous_point_rows() {
1✔
489
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
490
        write!(
1✔
491
            fake_file,
1✔
492
            "\
1✔
493
x,y
1✔
494
0,1
1✔
495
CORRUPT
1✔
496
4,5
1✔
497
"
1✔
498
        )
499
        .unwrap();
1✔
500
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
501

502
        let mut csv_source = CsvSourceStream::new(
1✔
503
            CsvSourceParameters {
1✔
504
                file_path: fake_file.path().into(),
1✔
505
                field_separator: ',',
1✔
506
                geometry: CsvGeometrySpecification::XY {
1✔
507
                    x: "x".into(),
1✔
508
                    y: "y".into(),
1✔
509
                },
1✔
510
                time: CsvTimeSpecification::None,
1✔
511
            },
1✔
512
            BoundingBox2D::new_unchecked((0., 0.).into(), (5., 5.).into()),
1✔
513
            1,
514
        )
515
        .unwrap();
1✔
516

517
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 1);
1✔
518
        assert!(csv_source.next().await.unwrap().is_err());
1✔
519
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 1);
1✔
520
        assert!(csv_source.next().await.is_none());
1✔
521
    }
1✔
522

523
    #[tokio::test]
524
    async fn corrupt_point_header() {
1✔
525
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
526
        write!(
1✔
527
            fake_file,
1✔
528
            "\
1✔
529
x,z
1✔
530
0,1
1✔
531
2,3
1✔
532
4,5
1✔
533
"
1✔
534
        )
535
        .unwrap();
1✔
536
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
537

538
        let mut csv_source = CsvSourceStream::new(
1✔
539
            CsvSourceParameters {
1✔
540
                file_path: fake_file.path().into(),
1✔
541
                field_separator: ',',
1✔
542
                geometry: CsvGeometrySpecification::XY {
1✔
543
                    x: "x".into(),
1✔
544
                    y: "y".into(),
1✔
545
                },
1✔
546
                time: CsvTimeSpecification::None,
1✔
547
            },
1✔
548
            BoundingBox2D::new_unchecked((0., 0.).into(), (5., 5.).into()),
1✔
549
            1,
550
        )
551
        .unwrap();
1✔
552

553
        assert!(csv_source.next().await.unwrap().is_err());
1✔
554
        assert!(csv_source.next().await.is_none());
1✔
555
    }
1✔
556

557
    #[tokio::test]
558
    async fn processor() {
1✔
559
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
560
        write!(
1✔
561
            fake_file,
1✔
562
            "\
1✔
563
x,y
1✔
564
0,1
1✔
565
2,3
1✔
566
4,5
1✔
567
"
1✔
568
        )
569
        .unwrap();
1✔
570
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
571

572
        let params = CsvSourceParameters {
1✔
573
            file_path: fake_file.path().into(),
1✔
574
            field_separator: ',',
1✔
575
            geometry: CsvGeometrySpecification::XY {
1✔
576
                x: "x".into(),
1✔
577
                y: "y".into(),
1✔
578
            },
1✔
579
            time: CsvTimeSpecification::None,
1✔
580
        };
1✔
581

582
        let p = CsvSourceProcessor {
1✔
583
            params,
1✔
584
            result_descriptor: VectorResultDescriptor {
1✔
585
                data_type: VectorDataType::MultiPoint,
1✔
586
                spatial_reference: SpatialReference::epsg_4326().into(),
1✔
587
                columns: Default::default(),
1✔
588
                time: None,
1✔
589
                bbox: None,
1✔
590
            },
1✔
591
        };
1✔
592

593
        let query = VectorQueryRectangle {
1✔
594
            spatial_bounds: BoundingBox2D::new_unchecked(
1✔
595
                Coordinate2D::new(0., 0.),
1✔
596
                Coordinate2D::new(3., 3.),
1✔
597
            ),
1✔
598
            time_interval: TimeInterval::new_unchecked(0, 1),
1✔
599
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
600
            attributes: ColumnSelection::all(),
1✔
601
        };
1✔
602
        let ctx = MockQueryContext::new((10 * 8 * 2).into());
1✔
603

604
        let r: Vec<Result<MultiPointCollection>> =
1✔
605
            p.query(query, &ctx).await.unwrap().collect().await;
1✔
606

607
        assert_eq!(r.len(), 1);
1✔
608

609
        assert_eq!(
1✔
610
            serde_json::from_str::<serde_json::Value>(&r[0].as_ref().unwrap().to_geo_json())
1✔
611
                .unwrap(),
1✔
612
            serde_json::json!({
1✔
613
                "type": "FeatureCollection",
1✔
614
                "features": [{
1✔
615
                    "type": "Feature",
1✔
616
                    "geometry": {
1✔
617
                        "type": "Point",
1✔
618
                        "coordinates": [0.0, 1.0]
1✔
619
                    },
1✔
620
                    "properties": {},
1✔
621
                    "when": {
1✔
622
                        "start": "-262143-01-01T00:00:00+00:00",
1✔
623
                        "end": "+262142-12-31T23:59:59.999+00:00",
1✔
624
                        "type": "Interval"
1✔
625
                    }
1✔
626
                }, {
1✔
627
                    "type": "Feature",
1✔
628
                    "geometry": {
1✔
629
                        "type": "Point",
1✔
630
                        "coordinates": [2.0, 3.0]
1✔
631
                    },
1✔
632
                    "properties": {},
1✔
633
                    "when": {
1✔
634
                        "start": "-262143-01-01T00:00:00+00:00",
1✔
635
                        "end": "+262142-12-31T23:59:59.999+00:00",
1✔
636
                        "type": "Interval"
1✔
637
                    }
1✔
638
                }]
1✔
639
            })
1✔
640
        );
1✔
641
    }
1✔
642

643
    #[test]
644
    fn operator() {
1✔
645
        let mut temp_file = tempfile::NamedTempFile::new().unwrap();
1✔
646
        write!(
1✔
647
            temp_file,
1✔
648
            "\
1✔
649
x;y
1✔
650
0;1
1✔
651
2;3
1✔
652
4;5
1✔
653
"
1✔
654
        )
655
        .unwrap();
1✔
656
        temp_file.seek(SeekFrom::Start(0)).unwrap();
1✔
657

658
        let params = CsvSourceParameters {
1✔
659
            file_path: temp_file.path().into(),
1✔
660
            field_separator: ';',
1✔
661
            geometry: CsvGeometrySpecification::XY {
1✔
662
                x: "x".into(),
1✔
663
                y: "y".into(),
1✔
664
            },
1✔
665
            time: CsvTimeSpecification::None,
1✔
666
        };
1✔
667

668
        let operator = CsvSource { params }.boxed();
1✔
669

670
        let operator_json = serde_json::to_value(&operator).unwrap();
1✔
671

672
        assert_eq!(
1✔
673
            operator_json,
674
            serde_json::json!({
1✔
675
                "type": "CsvSource",
1✔
676
                "params": {
1✔
677
                    "filePath": temp_file.path(),
1✔
678
                    "fieldSeparator": ";",
1✔
679
                    "geometry": {
1✔
680
                        "type": "xy",
1✔
681
                        "x": "x",
1✔
682
                        "y": "y"
1✔
683
                    },
684
                    "time": "None"
1✔
685
                }
686
            })
687
        );
688

689
        let _operator: Box<dyn VectorOperator> = serde_json::from_value(operator_json).unwrap();
1✔
690
    }
1✔
691
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc