• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 4594239788

pending completion
4594239788

Pull #772

github

GitHub
Merge 93719774d into 75538c8bc
Pull Request #772: bencher

134 of 134 new or added lines in 1 file covered. (100.0%)

96125 of 107670 relevant lines covered (89.28%)

72821.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/operators/src/util/workflow_bencher/mod.rs
1
use std::{
2
    hint::black_box,
3
    time::{Duration, Instant},
4
};
5

6
use futures::TryStreamExt;
7

8
use geoengine_datatypes::primitives::{QueryRectangle, RasterQueryRectangle};
9

10
use crate::call_on_generic_raster_processor;
11
use crate::engine::{ChunkByteSize, MockExecutionContext, RasterOperator, RasterQueryProcessor};
12

13
use geoengine_datatypes::{
14
    primitives::SpatialPartition2D,
15
    raster::{GridSize, TilingSpecification},
16
};
17

18
use serde::{Serialize, Serializer};
19

20
mod macros;
21

22
pub struct WorkflowBenchmarkCollector {
23
    pub writer: csv::Writer<std::io::Stdout>,
24
}
25

26
impl WorkflowBenchmarkCollector {
27
    pub fn add_benchmark_result(&mut self, result: WorkflowBenchmarkResult) {
×
28
        self.writer.serialize(result).unwrap();
×
29
        self.writer.flush().unwrap();
×
30
    }
×
31
}
32

33
impl Default for WorkflowBenchmarkCollector {
34
    fn default() -> Self {
×
35
        WorkflowBenchmarkCollector {
×
36
            writer: csv::Writer::from_writer(std::io::stdout()),
×
37
        }
×
38
    }
×
39
}
40

41
pub trait BenchmarkRunner {
42
    fn run_all_benchmarks(self, bencher: &mut WorkflowBenchmarkCollector);
43
}
44

45
pub struct WorkflowSingleBenchmark<F, O> {
46
    bench_id: &'static str,
47
    query_name: &'static str,
48
    query_rect: QueryRectangle<SpatialPartition2D>,
49
    tiling_spec: TilingSpecification,
50
    chunk_byte_size: ChunkByteSize,
51
    num_threads: usize,
52
    operator_builder: O,
53
    context_builder: F,
54
}
55

56
impl<O, F> WorkflowSingleBenchmark<F, O>
57
where
58
    F: Fn(TilingSpecification, usize) -> MockExecutionContext,
59
    O: Fn(TilingSpecification, QueryRectangle<SpatialPartition2D>) -> Box<dyn RasterOperator>,
60
{
61
    #[inline(never)]
62
    pub fn run_bench(&self) -> WorkflowBenchmarkResult {
×
63
        let run_time = tokio::runtime::Runtime::new().unwrap();
×
64
        let exe_ctx = (self.context_builder)(self.tiling_spec, self.num_threads);
×
65
        let ctx = exe_ctx.mock_query_context(self.chunk_byte_size);
×
66

×
67
        let operator = (self.operator_builder)(self.tiling_spec, self.query_rect);
×
68
        let init_start = Instant::now();
×
69
        let initialized_operator = run_time.block_on(async { operator.initialize(&exe_ctx).await });
×
70
        let init_elapsed = init_start.elapsed();
×
71

×
72
        let initialized_queryprocessor = initialized_operator.unwrap().query_processor().unwrap();
×
73

74
        call_on_generic_raster_processor!(initialized_queryprocessor, op => { run_time.block_on(async {
×
75
                let start_query = Instant::now();
×
76
                // query the operator
77
                let query = op
×
78
                    .raster_query(self.query_rect, &ctx)
×
79
                    .await
×
80
                    .unwrap();
×
81
                let query_elapsed = start_query.elapsed();
×
82

×
83
                let start = Instant::now();
×
84
                // drain the stream
85
                let tile_count_res: Result<usize,_> = query.try_fold(0, |accu, tile|  async move {
×
86
                    black_box({
×
87
                        let _ = tile;
×
88
                        Ok(accu + 1)
×
89
                    })
×
90
                }).await;
×
91

92
                let elapsed = start.elapsed();
×
93
                let number_of_tiles = tile_count_res.unwrap();
×
94

×
95
                WorkflowBenchmarkResult{
×
96
                    bench_id: self.bench_id,
×
97
                    num_threads: self.num_threads,
×
98
                    chunk_byte_size: self.chunk_byte_size,
×
99
                    qrect_name: self.query_name,
×
100
                    tile_size_x: self.tiling_spec.tile_size_in_pixels.axis_size_x(),
×
101
                    tile_size_y: self.tiling_spec.tile_size_in_pixels.axis_size_y(),
×
102
                    number_of_tiles,
×
103
                    init_time: init_elapsed,
×
104
                    query_time: query_elapsed,
×
105
                    stream_time: elapsed,
×
106
                }
×
107
            })
×
108
        })
109
    }
×
110
}
111

112
impl<F, O> BenchmarkRunner for WorkflowSingleBenchmark<F, O>
113
where
114
    F: Fn(TilingSpecification, usize) -> MockExecutionContext,
115
    O: Fn(TilingSpecification, QueryRectangle<SpatialPartition2D>) -> Box<dyn RasterOperator>,
116
{
117
    fn run_all_benchmarks(self, bencher: &mut WorkflowBenchmarkCollector) {
×
118
        bencher.add_benchmark_result(WorkflowSingleBenchmark::run_bench(&self))
×
119
    }
×
120
}
121

122
pub struct WorkflowMultiBenchmark<C, Q, T, B, O, F> {
123
    bench_id: &'static str,
124
    named_querys: Q,
125
    tiling_specs: T,
126
    operator_builder: O,
127
    context_builder: F,
128
    chunk_byte_size: B,
129
    num_threads: C,
130
}
131

132
impl<C, Q, T, B, O, F> WorkflowMultiBenchmark<C, Q, T, B, O, F>
133
where
134
    C: IntoIterator<Item = usize> + Clone,
135
    Q: IntoIterator<Item = (&'static str, RasterQueryRectangle)> + Clone,
136
    T: IntoIterator<Item = TilingSpecification> + Clone,
137
    B: IntoIterator<Item = ChunkByteSize> + Clone,
138
    F: Clone + Fn(TilingSpecification, usize) -> MockExecutionContext,
139
    O: Clone
140
        + Fn(TilingSpecification, QueryRectangle<SpatialPartition2D>) -> Box<dyn RasterOperator>,
141
{
142
    pub fn new(
×
143
        bench_id: &'static str,
×
144
        named_querys: Q,
×
145
        tiling_specs: T,
×
146
        operator_builder: O,
×
147
        context_builder: F,
×
148
        chunk_byte_size: B,
×
149
        num_threads: C,
×
150
    ) -> WorkflowMultiBenchmark<C, Q, T, B, O, F> {
×
151
        WorkflowMultiBenchmark {
×
152
            bench_id,
×
153
            named_querys,
×
154
            tiling_specs,
×
155
            operator_builder,
×
156
            context_builder,
×
157
            chunk_byte_size,
×
158
            num_threads,
×
159
        }
×
160
    }
×
161

162
    pub fn into_benchmark_iterator(self) -> impl Iterator<Item = WorkflowSingleBenchmark<F, O>> {
×
163
        let iter = self
×
164
            .num_threads
×
165
            .clone()
×
166
            .into_iter()
×
167
            .flat_map(move |num_threads| {
×
168
                self.tiling_specs
×
169
                    .clone()
×
170
                    .into_iter()
×
171
                    .map(move |tiling_spec| (num_threads, tiling_spec))
×
172
            });
×
173

×
174
        let iter = iter.flat_map(move |(num_threads, tiling_spec)| {
×
175
            self.chunk_byte_size
×
176
                .clone()
×
177
                .into_iter()
×
178
                .map(move |chunk_byte_size| (num_threads, tiling_spec, chunk_byte_size))
×
179
        });
×
180

×
181
        let iter = iter.flat_map(move |(num_threads, tiling_spec, chunk_byte_size)| {
×
182
            self.named_querys
×
183
                .clone()
×
184
                .into_iter()
×
185
                .map(move |(query_name, query_rect)| {
×
186
                    (
×
187
                        num_threads,
×
188
                        tiling_spec,
×
189
                        chunk_byte_size,
×
190
                        query_name,
×
191
                        query_rect,
×
192
                    )
×
193
                })
×
194
        });
×
195

×
196
        iter.map(
×
197
            move |(num_threads, tiling_spec, chunk_byte_size, query_name, query_rect)| {
×
198
                WorkflowSingleBenchmark {
×
199
                    bench_id: self.bench_id,
×
200
                    query_name,
×
201
                    query_rect,
×
202
                    tiling_spec,
×
203
                    chunk_byte_size,
×
204
                    num_threads,
×
205
                    operator_builder: self.operator_builder.clone(),
×
206
                    context_builder: self.context_builder.clone(),
×
207
                }
×
208
            },
×
209
        )
×
210
    }
×
211
}
212

213
impl<C, Q, T, B, O, F> BenchmarkRunner for WorkflowMultiBenchmark<C, Q, T, B, O, F>
214
where
215
    C: IntoIterator<Item = usize> + Clone,
216
    Q: IntoIterator<Item = (&'static str, RasterQueryRectangle)> + Clone,
217
    T: IntoIterator<Item = TilingSpecification> + Clone,
218
    B: IntoIterator<Item = ChunkByteSize> + Clone,
219
    F: Clone + Fn(TilingSpecification, usize) -> MockExecutionContext,
220
    O: Clone
221
        + Fn(TilingSpecification, QueryRectangle<SpatialPartition2D>) -> Box<dyn RasterOperator>,
222
{
223
    fn run_all_benchmarks(self, bencher: &mut WorkflowBenchmarkCollector) {
×
224
        self.into_benchmark_iterator()
×
225
            .for_each(|bench| bencher.add_benchmark_result(bench.run_bench()));
×
226
    }
×
227
}
228

229
fn serialize_duration_as_millis<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
×
230
where
×
231
    S: Serializer,
×
232
{
×
233
    serializer.serialize_u128(duration.as_millis())
×
234
}
×
235

236
#[derive(Debug, Serialize)]
×
237
pub struct WorkflowBenchmarkResult {
238
    pub bench_id: &'static str,
239
    pub num_threads: usize,
240
    pub chunk_byte_size: ChunkByteSize,
241
    pub qrect_name: &'static str,
242
    pub tile_size_x: usize,
243
    pub tile_size_y: usize,
244
    pub number_of_tiles: usize,
245
    #[serde(rename = "init_time_ms")]
246
    #[serde(serialize_with = "serialize_duration_as_millis")]
247
    pub init_time: Duration,
248
    #[serde(rename = "query_time_ms")]
249
    #[serde(serialize_with = "serialize_duration_as_millis")]
250
    pub query_time: Duration,
251
    #[serde(rename = "stream_time_ms")]
252
    #[serde(serialize_with = "serialize_duration_as_millis")]
253
    pub stream_time: Duration,
254
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc