• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Freyskeyd / chekov / 4514550503

pending completion
4514550503

push

github

Freyskeyd
refactor: remove old allowed clippy condition

4316 of 9195 relevant lines covered (46.94%)

99.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.71
/crates/event_store-backend-postgres/src/sql.rs
1
use async_stream::try_stream;
×
2
use event_store_core::event::RecordedEvent;
3
use event_store_core::event::UnsavedEvent;
4
use event_store_core::stream::Stream;
5
use futures::StreamExt;
6
use log::trace;
7
use sqlx::pool::PoolConnection;
8
use sqlx::postgres::PgRow;
9
use sqlx::Postgres;
10
use sqlx::Row;
11
use std::convert::TryInto;
12
use uuid::Uuid;
13

14
use crate::error::PostgresBackendError;
15

16
const STREAM_FORWARD: &str = r#"SELECT
17
        stream_events.stream_version as event_number,
18
        events.event_id as event_uuid,
19
        streams.stream_uuid,
20
        stream_events.original_stream_version as stream_version,
21
        events.event_type::text,
22
        events.correlation_id,
23
        events.causation_id,
24
        events.data::jsonb,
25
        events.metadata::text,
26
        events.created_at
27
    FROM
28
        stream_events
29
        inner JOIN streams ON streams.stream_id = stream_events.original_stream_id
30
        inner JOIN events ON stream_events.event_id = events.event_id
31
    WHERE
32
        stream_events.stream_uuid = $1 
33
        ORDER BY stream_events.stream_version ASC
34
        OFFSET $2
35
        LIMIT $3;
36
         "#;
37

38
pub fn stream_forward(
×
39
    mut conn: PoolConnection<Postgres>,
40
    stream_uuid: String,
41
    batch_size: i8,
42
) -> std::pin::Pin<Box<dyn futures::Stream<Item = Result<Vec<RecordedEvent>, sqlx::Error>> + Send>>
43
{
44
    try_stream! {
×
45
        let mut offset = 0;
46

47
        loop {
48
            let q = sqlx::query_as(STREAM_FORWARD);
49
            let inner = q.bind(&stream_uuid).bind(offset).bind(batch_size).fetch_all(&mut *conn).await;
50

51
            match inner {
52
                Ok(events) if events.len() > 0 => yield events,
53
                _ => break,
54
            }
55

56
            offset += batch_size;
57
        }
58
    }.boxed()
59
}
×
60

61
pub async fn read_stream(
×
62
    conn: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
×
63
    stream_id: i64,
×
64
    version: usize,
×
65
    limit: usize,
×
66
) -> Result<Vec<RecordedEvent>, sqlx::Error> {
×
67
    let version: i64 = version.try_into().unwrap();
×
68
    let limit: i64 = limit.try_into().unwrap();
×
69
    trace!("Version {}, Limit: {}", version, limit);
×
70
    sqlx::query_as::<_, RecordedEvent>(
×
71
        r#"SELECT
72
        stream_events.stream_version as event_number,
73
        events.event_id as event_uuid,
74
        streams.stream_uuid,
75
        stream_events.original_stream_version as stream_version,
76
        events.event_type::text,
77
        events.correlation_id,
78
        events.causation_id,
79
        events.data::jsonb,
80
        events.metadata::text,
81
        events.created_at
82
    FROM
83
        stream_events
84
        inner JOIN streams ON streams.stream_id = stream_events.original_stream_id
85
        inner JOIN events ON stream_events.event_id = events.event_id
86
    WHERE
87
        stream_events.stream_id = $1 AND stream_events.stream_version >=$2
88
        ORDER BY stream_events.stream_version ASC
89
        LIMIT $3;
90
         "#,
91
    )
92
    .bind(stream_id)
93
    .bind(version)
×
94
    .bind(limit)
×
95
    .fetch_all(conn)
×
96
    .await
×
97
}
×
98

99
pub async fn stream_info(
×
100
    conn: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
×
101
    stream_uuid: &str,
×
102
) -> Result<Stream, sqlx::Error> {
×
103
    sqlx::query_as::<_, Stream>(
×
104
        "SELECT stream_id, stream_uuid, stream_version, created_at, deleted_at FROM streams WHERE stream_uuid = $1"
105
    ).bind(stream_uuid)
106
        .fetch_one(conn)
×
107
        .await
×
108
}
×
109

110
pub async fn create_stream(
×
111
    pool: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
×
112
    stream_uuid: &str,
×
113
) -> Result<Stream, PostgresBackendError> {
×
114
    Ok(sqlx::query_as::<_, Stream>("INSERT INTO streams (stream_uuid) VALUES ($1) RETURNING stream_id, stream_uuid, stream_version, created_at, deleted_at")
×
115
            .bind(stream_uuid)
116
            .fetch_one(pool)
×
117
            .await?)
×
118
}
×
119

120
fn create_append_indexes(events: usize) -> String {
1✔
121
    (0..events)
1✔
122
        .map(|i| {
2✔
123
            format!(
14✔
124
                "(${}, ${}, ${}, ${}, ${}::jsonb, ${}::jsonb, ${}::timestamp)",
125
                (i * 7) + 1,
2✔
126
                (i * 7) + 2,
2✔
127
                (i * 7) + 3,
2✔
128
                (i * 7) + 4,
2✔
129
                (i * 7) + 5,
2✔
130
                (i * 7) + 6,
2✔
131
                (i * 7) + 7
2✔
132
            )
133
        })
2✔
134
        .collect::<Vec<String>>()
135
        .join(",")
136
}
1✔
137

138
fn create_event_id_stream_version_indexes(starting_at: usize, length: usize) -> String {
×
139
    (0..length)
×
140
        .map(|i| match i {
×
141
            0 => format!("(${}::uuid, ${}::bigint)", starting_at, starting_at + 1),
×
142
            event_number => {
143
                let index = (event_number * 2) + starting_at;
×
144
                format!("(${}, ${})", index, index + 1)
×
145
            }
×
146
        })
×
147
        .collect::<Vec<String>>()
148
        .join(",")
149
}
×
150

151
pub async fn insert_events(
×
152
    conn: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
×
153
    stream_uuid: &str,
×
154
    events: &[UnsavedEvent],
×
155
) -> Result<Vec<Uuid>, sqlx::Error> {
×
156
    let q = format!(
×
157
        r#"
158
WITH
159
  inserted_events AS (
160
    INSERT INTO events (event_id, event_type, causation_id, correlation_id, data, metadata, created_at)
161
    VALUES {} RETURNING *
162
  ),
163

164
  updated_stream AS (
165
    UPDATE
166
      streams
167
    SET
168
      stream_version = stream_version + {event_number}
169
    FROM inserted_events
170
    WHERE
171
      streams.stream_uuid = '{}' RETURNING streams.stream_id,
172
      stream_version,
173
      stream_version - {event_number} AS initial_stream_version,
174
      stream_uuid
175
  ),
176

177
  events_mapping (event_id, stream_version) AS (VALUES {events}),
178

179
  insert_stream_events AS (
180
    INSERT INTO stream_events (event_id, stream_id, stream_version, original_stream_id, original_stream_version)
181
    SELECT events_mapping.event_id, updated_stream.stream_id, events_mapping.stream_version, updated_stream.stream_id, events_mapping.stream_version
182
    FROM events_mapping, updated_stream RETURNING *
183
  )
184

185
SELECT
186
  inserted_events.event_id as event_uuid
187
FROM
188
  inserted_events;
189
  "#,
190
        create_append_indexes(events.len()),
×
191
        &stream_uuid,
×
192
        event_number = events.len(),
×
193
        events = create_event_id_stream_version_indexes(events.len() * 7 + 1, events.len()),
×
194
    );
195

196
    let mut query = sqlx::query(&q);
×
197
    for event in events {
×
198
        query = query
×
199
            .bind(event.event_uuid)
×
200
            .bind(&event.event_type)
×
201
            .bind(event.causation_id)
×
202
            .bind(event.correlation_id)
×
203
            .bind(&event.data)
×
204
            .bind(&event.metadata)
×
205
            .bind(event.created_at);
×
206
    }
207
    for event in events.iter() {
×
208
        query = query.bind(event.event_uuid).bind(event.stream_version);
×
209
    }
210

211
    query.map(|row: PgRow| row.get(0)).fetch_all(conn).await
×
212
}
×
213

214
#[cfg(test)]
215
mod test {
216

217
    use event_store_core::event::{Event, RecordedEvent, UnsavedEvent};
218
    use serde::{Deserialize, Serialize};
219

220
    use pretty_assertions::assert_eq;
221

222
    use crate::sql::create_append_indexes;
223

224
    #[derive(Serialize, Deserialize)]
2✔
225
    struct MyEvent {}
226
    impl Event for MyEvent {
227
        fn event_type(&self) -> &'static str {
2✔
228
            "MyEvent"
229
        }
2✔
230

231
        fn all_event_types() -> Vec<&'static str> {
×
232
            vec!["MyEvent"]
×
233
        }
×
234
    }
235

236
    impl std::convert::TryFrom<RecordedEvent> for MyEvent {
237
        type Error = ();
238
        fn try_from(e: RecordedEvent) -> Result<Self, Self::Error> {
×
239
            serde_json::from_value(e.data).map_err(|_| ())
×
240
        }
×
241
    }
242

243
    #[test]
244
    fn should_produce_the_right_number_of_arguments() {
2✔
245
        let e = MyEvent {};
246
        let events: Vec<UnsavedEvent> = vec![
2✔
247
            UnsavedEvent::try_from(&e).unwrap(),
1✔
248
            UnsavedEvent::try_from(&e).unwrap(),
1✔
249
        ];
250
        assert_eq!(
1✔
251
            "($1, $2, $3, $4, $5::jsonb, $6::jsonb, $7::timestamp),($8, $9, $10, $11, $12::jsonb, $13::jsonb, $14::timestamp)",
252
            create_append_indexes(events.len())
1✔
253
        );
254
    }
2✔
255
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc