• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.52
/operators/src/source/csv.rs
1
use std::path::PathBuf;
2
use std::pin::Pin;
3
use std::sync::{Arc, Mutex};
4
use std::{fs::File, sync::atomic::AtomicBool};
5

6
use csv::{Position, Reader, StringRecord};
7
use futures::stream::BoxStream;
8
use futures::task::{Context, Poll};
9
use futures::{Stream, StreamExt};
10
use geoengine_datatypes::dataset::NamedData;
11
use geoengine_datatypes::primitives::{ColumnSelection, VectorQueryRectangle};
12
use serde::{Deserialize, Serialize};
13
use snafu::{ensure, OptionExt, ResultExt};
14

15
use geoengine_datatypes::collections::{
16
    BuilderProvider, GeoFeatureCollectionRowBuilder, MultiPointCollection, VectorDataType,
17
};
18
use geoengine_datatypes::{
19
    primitives::{BoundingBox2D, Coordinate2D, TimeInterval},
20
    spatial_reference::SpatialReference,
21
};
22

23
use crate::engine::{
24
    CanonicOperatorName, InitializedVectorOperator, OperatorData, OperatorName, QueryContext,
25
    SourceOperator, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
26
    VectorResultDescriptor,
27
};
28
use crate::engine::{QueryProcessor, WorkflowOperatorPath};
29
use crate::error;
30
use crate::util::{safe_lock_mutex, Result};
31
use async_trait::async_trait;
32
use std::sync::atomic::Ordering;
33

34
/// Parameters for the CSV Source Operator
35
///
36
/// # Examples
37
///
38
/// ```rust
39
/// use serde_json::{Result, Value};
40
/// use geoengine_operators::source::{CsvSourceParameters, CsvSource};
41
/// use geoengine_operators::source::{CsvGeometrySpecification, CsvTimeSpecification};
42
///
43
/// let json_string = r#"
44
///     {
45
///         "type": "CsvSource",
46
///         "params": {
47
///             "filePath": "/foo/bar.csv",
48
///             "fieldSeparator": ",",
49
///             "geometry": {
50
///                 "type": "xy",
51
///                 "x": "x",
52
///                 "y": "y"
53
///             }
54
///         }
55
///     }"#;
56
///
57
/// let operator: CsvSource = serde_json::from_str(json_string).unwrap();
58
///
59
/// assert_eq!(operator, CsvSource {
60
///     params: CsvSourceParameters {
61
///         file_path: "/foo/bar.csv".into(),
62
///         field_separator: ',',
63
///         geometry: CsvGeometrySpecification::XY { x: "x".into(), y: "y".into() },
64
///         time: CsvTimeSpecification::None,
65
///     },
66
/// });
67
/// ```
68
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
69
#[serde(rename_all = "camelCase")]
70
pub struct CsvSourceParameters {
71
    pub file_path: PathBuf,
72
    pub field_separator: char,
73
    pub geometry: CsvGeometrySpecification,
74
    #[serde(default)]
75
    pub time: CsvTimeSpecification,
76
}
77

78
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
79
#[serde(tag = "type", rename_all = "lowercase")]
80
pub enum CsvGeometrySpecification {
81
    #[allow(clippy::upper_case_acronyms)]
82
    XY { x: String, y: String },
83
}
84

85
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
86
pub enum CsvTimeSpecification {
87
    None,
88
}
89

90
impl Default for CsvTimeSpecification {
91
    fn default() -> Self {
×
92
        Self::None
×
93
    }
×
94
}
95

96
enum ReaderState {
97
    Untouched(Reader<File>),
98
    OnGoing {
99
        header: ParsedHeader,
100
        records: csv::StringRecordsIntoIter<File>,
101
    },
102
    Error,
103
}
104

105
impl ReaderState {
106
    pub fn setup_once(&mut self, geometry_specification: CsvGeometrySpecification) -> Result<()> {
17✔
107
        if let ReaderState::Untouched(..) = self {
17✔
108
            // pass
7✔
109
        } else {
7✔
110
            return Ok(());
10✔
111
        }
112

113
        let old_state = std::mem::replace(self, ReaderState::Error);
7✔
114

115
        if let ReaderState::Untouched(mut csv_reader) = old_state {
7✔
116
            let header = match CsvSourceStream::setup_read(geometry_specification, &mut csv_reader)
7✔
117
            {
118
                Ok(header) => header,
6✔
119
                Err(error) => return Err(error),
1✔
120
            };
121

122
            let mut records = csv_reader.into_records();
6✔
123

6✔
124
            // consume the first row, which is the header
6✔
125
            if header.has_header {
6✔
126
                // TODO: throw error
6✔
127
                records.next();
6✔
128
            }
6✔
129

130
            *self = ReaderState::OnGoing { header, records }
6✔
131
        }
×
132

133
        Ok(())
6✔
134
    }
17✔
135
}
136

137
pub struct CsvSourceStream {
138
    parameters: CsvSourceParameters,
139
    bbox: BoundingBox2D,
140
    chunk_size: usize,
141
    reader_state: Arc<Mutex<ReaderState>>,
142
    thread_is_computing: Arc<AtomicBool>,
143
    #[allow(clippy::option_option)]
144
    poll_result: Arc<Mutex<Option<Option<Result<MultiPointCollection>>>>>,
145
}
146

147
pub type CsvSource = SourceOperator<CsvSourceParameters>;
148

149
impl OperatorName for CsvSource {
150
    const TYPE_NAME: &'static str = "CsvSource";
151
}
152

153
impl OperatorData for CsvSourceParameters {
154
    fn data_names_collect(&self, _data_names: &mut Vec<NamedData>) {}
×
155
}
156

157
#[typetag::serde]
5✔
158
#[async_trait]
159
impl VectorOperator for CsvSource {
160
    async fn _initialize(
161
        self: Box<Self>,
162
        path: WorkflowOperatorPath,
163
        _context: &dyn crate::engine::ExecutionContext,
164
    ) -> Result<Box<dyn InitializedVectorOperator>> {
4✔
165
        let initialized_source = InitializedCsvSource {
4✔
166
            name: CanonicOperatorName::from(&self),
4✔
167
            path,
4✔
168
            result_descriptor: VectorResultDescriptor {
4✔
169
                data_type: VectorDataType::MultiPoint, // TODO: get as user input
4✔
170
                spatial_reference: SpatialReference::epsg_4326().into(), // TODO: get as user input
4✔
171
                columns: Default::default(), // TODO: get when source allows loading other columns
4✔
172
                time: None,
4✔
173
                bbox: None,
4✔
174
            },
4✔
175
            state: self.params,
4✔
176
        };
4✔
177

4✔
178
        Ok(initialized_source.boxed())
4✔
179
    }
8✔
180

181
    span_fn!(CsvSource);
182
}
183

184
pub struct InitializedCsvSource {
185
    name: CanonicOperatorName,
186
    path: WorkflowOperatorPath,
187
    result_descriptor: VectorResultDescriptor,
188
    state: CsvSourceParameters,
189
}
190

191
impl InitializedVectorOperator for InitializedCsvSource {
192
    fn query_processor(&self) -> Result<crate::engine::TypedVectorQueryProcessor> {
3✔
193
        Ok(TypedVectorQueryProcessor::MultiPoint(
3✔
194
            CsvSourceProcessor {
3✔
195
                params: self.state.clone(),
3✔
196
                result_descriptor: self.result_descriptor.clone(),
3✔
197
            }
3✔
198
            .boxed(),
3✔
199
        ))
3✔
200
    }
3✔
201

202
    fn result_descriptor(&self) -> &VectorResultDescriptor {
4✔
203
        &self.result_descriptor
4✔
204
    }
4✔
205

206
    fn canonic_name(&self) -> CanonicOperatorName {
×
207
        self.name.clone()
×
208
    }
×
209

NEW
210
    fn name(&self) -> &'static str {
×
NEW
211
        CsvSource::TYPE_NAME
×
NEW
212
    }
×
213

NEW
214
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
215
        self.path.clone()
×
NEW
216
    }
×
217
}
218

219
impl CsvSourceStream {
220
    /// Creates a new `CsvSource`
221
    ///
222
    /// # Errors
223
    ///
224
    /// This constructor fails if the delimiter is not an ASCII character.
225
    /// Furthermore, there are IO errors from the reader.
226
    ///
227
    // TODO: include time interval, e.g. QueryRectangle parameter
228
    pub fn new(
7✔
229
        parameters: CsvSourceParameters,
7✔
230
        bbox: BoundingBox2D,
7✔
231
        chunk_size: usize,
7✔
232
    ) -> Result<Self> {
7✔
233
        ensure!(
7✔
234
            parameters.field_separator.is_ascii(),
7✔
235
            error::CsvSource {
×
236
                details: "Delimiter must be ASCII character"
×
237
            }
×
238
        );
239

240
        Ok(Self {
241
            reader_state: Arc::new(Mutex::new(ReaderState::Untouched(
242
                csv::ReaderBuilder::new()
7✔
243
                    .delimiter(parameters.field_separator as u8)
7✔
244
                    .has_headers(true)
7✔
245
                    .from_path(parameters.file_path.as_path())
7✔
246
                    .context(error::CsvSourceReader {})?,
7✔
247
            ))),
248
            thread_is_computing: Arc::new(AtomicBool::new(false)),
7✔
249
            poll_result: Arc::new(Mutex::new(None)),
7✔
250
            parameters,
7✔
251
            bbox,
7✔
252
            chunk_size,
7✔
253
        })
254
    }
7✔
255

256
    fn setup_read(
7✔
257
        geometry_specification: CsvGeometrySpecification,
7✔
258
        csv_reader: &mut Reader<File>,
7✔
259
    ) -> Result<ParsedHeader> {
7✔
260
        csv_reader
7✔
261
            .seek(Position::new())
7✔
262
            .context(error::CsvSourceReader {})?; // start at beginning
7✔
263

264
        ensure!(
7✔
265
            csv_reader.has_headers(),
7✔
266
            error::CsvSource {
×
267
                details: "CSV file must contain header",
×
268
            }
×
269
        );
270

271
        let header = csv_reader.headers().context(error::CsvSourceReader)?;
7✔
272

273
        let CsvGeometrySpecification::XY { x, y } = geometry_specification;
7✔
274
        let x_index = header
7✔
275
            .iter()
7✔
276
            .position(|v| v == x)
7✔
277
            .context(error::CsvSource {
7✔
278
                details: "Cannot find x index in csv header",
7✔
279
            })?;
7✔
280
        let y_index = header
7✔
281
            .iter()
7✔
282
            .position(|v| v == y)
14✔
283
            .context(error::CsvSource {
7✔
284
                details: "Cannot find y index in csv header",
7✔
285
            })?;
7✔
286

287
        Ok(ParsedHeader {
6✔
288
            has_header: true,
6✔
289
            x_index,
6✔
290
            y_index,
6✔
291
        })
6✔
292
    }
7✔
293

294
    /// Parse a single CSV row
295
    fn parse_row(header: &ParsedHeader, row: &StringRecord) -> Result<ParsedRow> {
17✔
296
        let x: f64 = row
17✔
297
            .get(header.x_index)
17✔
298
            .context(error::CsvSource {
17✔
299
                details: "Cannot find x index key",
17✔
300
            })?
17✔
301
            .parse()
17✔
302
            .map_err(|_error| error::Error::CsvSource {
17✔
303
                details: "Cannot parse x coordinate".to_string(),
×
304
            })?;
17✔
305
        let y: f64 = row
17✔
306
            .get(header.y_index)
17✔
307
            .context(error::CsvSource {
17✔
308
                details: "Cannot find y index key",
17✔
309
            })?
17✔
310
            .parse()
17✔
311
            .map_err(|_error| error::Error::CsvSource {
17✔
312
                details: "Cannot parse y coordinate".to_string(),
×
313
            })?;
17✔
314

315
        Ok(ParsedRow {
17✔
316
            coordinate: (x, y).into(),
17✔
317
            time_interval: TimeInterval::default(),
17✔
318
        })
17✔
319
    }
17✔
320
}
321

322
impl Stream for CsvSourceStream {
323
    type Item = Result<MultiPointCollection>;
324

325
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34✔
326
        // TODO: handle lock poisoning on multiple occasions
34✔
327

34✔
328
        if self.thread_is_computing.load(Ordering::Relaxed) {
34✔
329
            return Poll::Pending;
×
330
        }
34✔
331

34✔
332
        let mut poll_result = safe_lock_mutex(&self.poll_result);
34✔
333
        if let Some(x) = poll_result.take() {
34✔
334
            return Poll::Ready(x);
17✔
335
        }
17✔
336

17✔
337
        self.thread_is_computing.store(true, Ordering::Relaxed);
17✔
338

17✔
339
        let is_working = self.thread_is_computing.clone();
17✔
340
        let reader_state = self.reader_state.clone();
17✔
341
        let poll_result = self.poll_result.clone();
17✔
342

17✔
343
        let bbox = self.bbox;
17✔
344
        let chunk_size = self.chunk_size;
17✔
345
        let parameters = self.parameters.clone();
17✔
346
        let waker = cx.waker().clone();
17✔
347

17✔
348
        crate::util::spawn_blocking(move || {
17✔
349
            let mut csv_reader = safe_lock_mutex(&reader_state);
17✔
350
            let computation_result = || -> Result<Option<MultiPointCollection>> {
17✔
351
                // TODO: is clone necessary?
17✔
352
                let geometry_specification = parameters.geometry.clone();
17✔
353
                csv_reader.setup_once(geometry_specification)?;
17✔
354

355
                let (header, records) = match &mut *csv_reader {
16✔
356
                    ReaderState::OnGoing { header, records } => (header, records),
15✔
357
                    ReaderState::Error => return Ok(None),
1✔
358
                    ReaderState::Untouched(_) => unreachable!(),
×
359
                };
360

361
                let mut builder = MultiPointCollection::builder().finish_header();
15✔
362
                let mut number_of_entries = 0; // TODO: add size/len to builder
15✔
363

364
                while number_of_entries < chunk_size {
32✔
365
                    let Some(record) = records.next() else {
29✔
366
                        break;
11✔
367
                    };
368

369
                    let row = record.context(error::CsvSourceReader)?;
18✔
370
                    let parsed_row = CsvSourceStream::parse_row(header, &row)?;
17✔
371

372
                    // TODO: filter time
373
                    if bbox.contains_coordinate(&parsed_row.coordinate) {
17✔
374
                        builder.push_geometry(parsed_row.coordinate.into());
16✔
375
                        builder.push_time_interval(parsed_row.time_interval);
16✔
376
                        builder.finish_row();
16✔
377

16✔
378
                        number_of_entries += 1;
16✔
379
                    }
16✔
380
                }
381

382
                // TODO: is this the correct cancellation criterion?
383
                if number_of_entries > 0 {
14✔
384
                    let collection = builder.build()?;
8✔
385
                    Ok(Some(collection))
8✔
386
                } else {
387
                    Ok(None)
6✔
388
                }
389
            }();
17✔
390

391
            *safe_lock_mutex(&poll_result) = Some(match computation_result {
17✔
392
                Ok(Some(collection)) => Some(Ok(collection)),
8✔
393
                Ok(None) => None,
7✔
394
                Err(e) => Some(Err(e)),
2✔
395
            });
396
            is_working.store(false, Ordering::Relaxed);
17✔
397

17✔
398
            waker.wake();
17✔
399
        });
17✔
400

17✔
401
        Poll::Pending
17✔
402
    }
34✔
403
}
404

405
#[derive(Debug)]
406
struct CsvSourceProcessor {
407
    params: CsvSourceParameters,
408
    result_descriptor: VectorResultDescriptor,
409
}
410

411
#[async_trait]
412
impl QueryProcessor for CsvSourceProcessor {
413
    type Output = MultiPointCollection;
414
    type SpatialBounds = BoundingBox2D;
415
    type Selection = ColumnSelection;
416
    type ResultDescription = VectorResultDescriptor;
417

418
    async fn _query<'a>(
419
        &'a self,
420
        query: VectorQueryRectangle,
421
        _ctx: &'a dyn QueryContext,
422
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
4✔
423
        // TODO: properly handle chunk_size
424
        Ok(CsvSourceStream::new(self.params.clone(), query.spatial_bounds, 10)?.boxed())
4✔
425
    }
8✔
426

427
    fn result_descriptor(&self) -> &VectorResultDescriptor {
7✔
428
        &self.result_descriptor
7✔
429
    }
7✔
430
}
431

432
#[derive(Clone, Copy, Debug)]
433
struct ParsedHeader {
434
    pub has_header: bool,
435
    pub x_index: usize,
436
    pub y_index: usize,
437
}
438

439
struct ParsedRow {
440
    pub coordinate: Coordinate2D,
441
    pub time_interval: TimeInterval,
442
    // TODO: fields
443
}
444

445
#[cfg(test)]
446
mod tests {
447
    use std::io::{Seek, SeekFrom, Write};
448

449
    use geoengine_datatypes::primitives::SpatialResolution;
450

451
    use super::*;
452
    use crate::engine::MockQueryContext;
453
    use geoengine_datatypes::collections::{FeatureCollectionInfos, ToGeoJson};
454

455
    #[tokio::test]
456
    async fn read_points() {
1✔
457
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
458
        write!(
1✔
459
            fake_file,
1✔
460
            "\
1✔
461
x,y
1✔
462
0,1
1✔
463
2,3
1✔
464
4,5
1✔
465
"
1✔
466
        )
1✔
467
        .unwrap();
1✔
468
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
469

1✔
470
        let mut csv_source = CsvSourceStream::new(
1✔
471
            CsvSourceParameters {
1✔
472
                file_path: fake_file.path().into(),
1✔
473
                field_separator: ',',
1✔
474
                geometry: CsvGeometrySpecification::XY {
1✔
475
                    x: "x".into(),
1✔
476
                    y: "y".into(),
1✔
477
                },
1✔
478
                time: CsvTimeSpecification::None,
1✔
479
            },
1✔
480
            BoundingBox2D::new_unchecked((0., 0.).into(), (5., 5.).into()),
1✔
481
            2,
1✔
482
        )
1✔
483
        .unwrap();
1✔
484

1✔
485
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 2);
1✔
486
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 1);
1✔
487
        assert!(csv_source.next().await.is_none());
1✔
488
    }
1✔
489

490
    #[tokio::test]
491
    async fn erroneous_point_rows() {
1✔
492
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
493
        write!(
1✔
494
            fake_file,
1✔
495
            "\
1✔
496
x,y
1✔
497
0,1
1✔
498
CORRUPT
1✔
499
4,5
1✔
500
"
1✔
501
        )
1✔
502
        .unwrap();
1✔
503
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
504

1✔
505
        let mut csv_source = CsvSourceStream::new(
1✔
506
            CsvSourceParameters {
1✔
507
                file_path: fake_file.path().into(),
1✔
508
                field_separator: ',',
1✔
509
                geometry: CsvGeometrySpecification::XY {
1✔
510
                    x: "x".into(),
1✔
511
                    y: "y".into(),
1✔
512
                },
1✔
513
                time: CsvTimeSpecification::None,
1✔
514
            },
1✔
515
            BoundingBox2D::new_unchecked((0., 0.).into(), (5., 5.).into()),
1✔
516
            1,
1✔
517
        )
1✔
518
        .unwrap();
1✔
519

1✔
520
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 1);
1✔
521
        assert!(csv_source.next().await.unwrap().is_err());
1✔
522
        assert_eq!(csv_source.next().await.unwrap().unwrap().len(), 1);
1✔
523
        assert!(csv_source.next().await.is_none());
1✔
524
    }
1✔
525

526
    #[tokio::test]
527
    async fn corrupt_point_header() {
1✔
528
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
529
        write!(
1✔
530
            fake_file,
1✔
531
            "\
1✔
532
x,z
1✔
533
0,1
1✔
534
2,3
1✔
535
4,5
1✔
536
"
1✔
537
        )
1✔
538
        .unwrap();
1✔
539
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
540

1✔
541
        let mut csv_source = CsvSourceStream::new(
1✔
542
            CsvSourceParameters {
1✔
543
                file_path: fake_file.path().into(),
1✔
544
                field_separator: ',',
1✔
545
                geometry: CsvGeometrySpecification::XY {
1✔
546
                    x: "x".into(),
1✔
547
                    y: "y".into(),
1✔
548
                },
1✔
549
                time: CsvTimeSpecification::None,
1✔
550
            },
1✔
551
            BoundingBox2D::new_unchecked((0., 0.).into(), (5., 5.).into()),
1✔
552
            1,
1✔
553
        )
1✔
554
        .unwrap();
1✔
555

1✔
556
        assert!(csv_source.next().await.unwrap().is_err());
1✔
557
        assert!(csv_source.next().await.is_none());
1✔
558
    }
1✔
559

560
    #[tokio::test]
561
    async fn processor() {
1✔
562
        let mut fake_file = tempfile::NamedTempFile::new().unwrap();
1✔
563
        write!(
1✔
564
            fake_file,
1✔
565
            "\
1✔
566
x,y
1✔
567
0,1
1✔
568
2,3
1✔
569
4,5
1✔
570
"
1✔
571
        )
1✔
572
        .unwrap();
1✔
573
        fake_file.seek(SeekFrom::Start(0)).unwrap();
1✔
574

1✔
575
        let params = CsvSourceParameters {
1✔
576
            file_path: fake_file.path().into(),
1✔
577
            field_separator: ',',
1✔
578
            geometry: CsvGeometrySpecification::XY {
1✔
579
                x: "x".into(),
1✔
580
                y: "y".into(),
1✔
581
            },
1✔
582
            time: CsvTimeSpecification::None,
1✔
583
        };
1✔
584

1✔
585
        let p = CsvSourceProcessor {
1✔
586
            params,
1✔
587
            result_descriptor: VectorResultDescriptor {
1✔
588
                data_type: VectorDataType::MultiPoint,
1✔
589
                spatial_reference: SpatialReference::epsg_4326().into(),
1✔
590
                columns: Default::default(),
1✔
591
                time: None,
1✔
592
                bbox: None,
1✔
593
            },
1✔
594
        };
1✔
595

1✔
596
        let query = VectorQueryRectangle {
1✔
597
            spatial_bounds: BoundingBox2D::new_unchecked(
1✔
598
                Coordinate2D::new(0., 0.),
1✔
599
                Coordinate2D::new(3., 3.),
1✔
600
            ),
1✔
601
            time_interval: TimeInterval::new_unchecked(0, 1),
1✔
602
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
603
            attributes: ColumnSelection::all(),
1✔
604
        };
1✔
605
        let ctx = MockQueryContext::new((10 * 8 * 2).into());
1✔
606

1✔
607
        let r: Vec<Result<MultiPointCollection>> =
1✔
608
            p.query(query, &ctx).await.unwrap().collect().await;
1✔
609

1✔
610
        assert_eq!(r.len(), 1);
1✔
611

1✔
612
        assert_eq!(
1✔
613
            r[0].as_ref().unwrap().to_geo_json(),
1✔
614
            serde_json::json!({
1✔
615
                "type": "FeatureCollection",
1✔
616
                "features": [{
1✔
617
                    "type": "Feature",
1✔
618
                    "geometry": {
1✔
619
                        "type": "Point",
1✔
620
                        "coordinates": [0.0, 1.0]
1✔
621
                    },
1✔
622
                    "properties": {},
1✔
623
                    "when": {
1✔
624
                        "start": "-262143-01-01T00:00:00+00:00",
1✔
625
                        "end": "+262142-12-31T23:59:59.999+00:00",
1✔
626
                        "type": "Interval"
1✔
627
                    }
1✔
628
                }, {
1✔
629
                    "type": "Feature",
1✔
630
                    "geometry": {
1✔
631
                        "type": "Point",
1✔
632
                        "coordinates": [2.0, 3.0]
1✔
633
                    },
1✔
634
                    "properties": {},
1✔
635
                    "when": {
1✔
636
                        "start": "-262143-01-01T00:00:00+00:00",
1✔
637
                        "end": "+262142-12-31T23:59:59.999+00:00",
1✔
638
                        "type": "Interval"
1✔
639
                    }
1✔
640
                }]
1✔
641
            })
1✔
642
            .to_string()
1✔
643
        );
1✔
644
    }
1✔
645

646
    #[test]
647
    fn operator() {
1✔
648
        let mut temp_file = tempfile::NamedTempFile::new().unwrap();
1✔
649
        write!(
1✔
650
            temp_file,
1✔
651
            "\
1✔
652
x;y
1✔
653
0;1
1✔
654
2;3
1✔
655
4;5
1✔
656
"
1✔
657
        )
1✔
658
        .unwrap();
1✔
659
        temp_file.seek(SeekFrom::Start(0)).unwrap();
1✔
660

1✔
661
        let params = CsvSourceParameters {
1✔
662
            file_path: temp_file.path().into(),
1✔
663
            field_separator: ';',
1✔
664
            geometry: CsvGeometrySpecification::XY {
1✔
665
                x: "x".into(),
1✔
666
                y: "y".into(),
1✔
667
            },
1✔
668
            time: CsvTimeSpecification::None,
1✔
669
        };
1✔
670

1✔
671
        let operator = CsvSource { params }.boxed();
1✔
672

1✔
673
        let operator_json = serde_json::to_value(&operator).unwrap();
1✔
674

1✔
675
        assert_eq!(
1✔
676
            operator_json,
1✔
677
            serde_json::json!({
1✔
678
                "type": "CsvSource",
1✔
679
                "params": {
1✔
680
                    "filePath": temp_file.path(),
1✔
681
                    "fieldSeparator": ";",
1✔
682
                    "geometry": {
1✔
683
                        "type": "xy",
1✔
684
                        "x": "x",
1✔
685
                        "y": "y"
1✔
686
                    },
1✔
687
                    "time": "None"
1✔
688
                }
1✔
689
            })
1✔
690
        );
1✔
691

692
        let _operator: Box<dyn VectorOperator> = serde_json::from_value(operator_json).unwrap();
1✔
693
    }
1✔
694
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc