• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Freyskeyd / chekov / 4523955526

pending completion
4523955526

push

github

Freyskeyd
feat(update-struct-and-enum-to-use-u64-instead-of-i64): Following the changes and improvments from sqlx we can now use u64 in place of i64

25 of 25 new or added lines in 11 files covered. (100.0%)

4343 of 9076 relevant lines covered (47.85%)

100.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.0
/crates/event_store-backend-postgres/src/sql.rs
1
use async_stream::try_stream;
×
2
use event_store_core::event::RecordedEvent;
3
use event_store_core::event::UnsavedEvent;
4
use event_store_core::stream::Stream;
5
use futures::StreamExt;
6
use sqlx::pool::PoolConnection;
7
use sqlx::postgres::PgRow;
8
use sqlx::Postgres;
9
use sqlx::Row;
10
use std::convert::TryInto;
11
use tracing::trace;
12
use uuid::Uuid;
13

14
use crate::error::PostgresBackendError;
15

16
const STREAM_FORWARD: &str = r#"SELECT
17
        stream_events.stream_version as event_number,
18
        events.event_id as event_uuid,
19
        streams.stream_uuid,
20
        stream_events.original_stream_version as stream_version,
21
        events.event_type::text,
22
        events.correlation_id,
23
        events.causation_id,
24
        events.data::jsonb,
25
        events.metadata::text,
26
        events.created_at
27
    FROM
28
        stream_events
29
        inner JOIN streams ON streams.stream_id = stream_events.original_stream_id
30
        inner JOIN events ON stream_events.event_id = events.event_id
31
    WHERE
32
        stream_events.stream_uuid = $1 
33
        ORDER BY stream_events.stream_version ASC
34
        OFFSET $2
35
        LIMIT $3;
36
         "#;
37

38
pub fn stream_forward(
×
39
    mut conn: PoolConnection<Postgres>,
40
    stream_uuid: String,
41
    batch_size: i8,
42
) -> std::pin::Pin<Box<dyn futures::Stream<Item = Result<Vec<RecordedEvent>, sqlx::Error>> + Send>>
43
{
44
    try_stream! {
×
45
        let mut offset = 0;
46

47
        loop {
48
            let q = sqlx::query_as(STREAM_FORWARD);
49
            let inner = q.bind(&stream_uuid).bind(offset).bind(batch_size).fetch_all(&mut *conn).await;
50

51
            match inner {
52
                Ok(events) if events.len() > 0 => yield events,
53
                _ => break,
54
            }
55

56
            offset += batch_size;
57
        }
58
    }.boxed()
59
}
×
60

61
pub async fn read_stream(
×
62
    conn: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
×
63
    stream_id: u64,
×
64
    version: usize,
×
65
    limit: usize,
×
66
) -> Result<Vec<RecordedEvent>, sqlx::Error> {
×
67
    let stream_id: i64 = stream_id.try_into().unwrap();
×
68
    let version: i64 = version.try_into().unwrap();
×
69
    let limit: i64 = limit.try_into().unwrap();
×
70
    trace!("Version {}, Limit: {}", version, limit);
×
71
    sqlx::query_as::<_, RecordedEvent>(
×
72
        r#"SELECT
73
        stream_events.stream_version as event_number,
74
        events.event_id as event_uuid,
75
        streams.stream_uuid,
76
        stream_events.original_stream_version as stream_version,
77
        events.event_type::text,
78
        events.correlation_id,
79
        events.causation_id,
80
        events.data::jsonb,
81
        events.metadata::text,
82
        events.created_at
83
    FROM
84
        stream_events
85
        inner JOIN streams ON streams.stream_id = stream_events.original_stream_id
86
        inner JOIN events ON stream_events.event_id = events.event_id
87
    WHERE
88
        stream_events.stream_id = $1 AND stream_events.stream_version >=$2
89
        ORDER BY stream_events.stream_version ASC
90
        LIMIT $3;
91
         "#,
92
    )
93
    .bind(stream_id)
94
    .bind(version)
×
95
    .bind(limit)
×
96
    .fetch_all(conn)
×
97
    .await
×
98
}
×
99

100
pub async fn stream_info(
×
101
    conn: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
×
102
    stream_uuid: &str,
×
103
) -> Result<Stream, sqlx::Error> {
×
104
    sqlx::query_as::<_, Stream>(
×
105
        "SELECT stream_id, stream_uuid, stream_version, created_at, deleted_at FROM streams WHERE stream_uuid = $1"
106
    ).bind(stream_uuid)
107
        .fetch_one(conn)
×
108
        .await
×
109
}
×
110

111
pub async fn create_stream(
×
112
    pool: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
×
113
    stream_uuid: &str,
×
114
) -> Result<Stream, PostgresBackendError> {
×
115
    Ok(sqlx::query_as::<_, Stream>("INSERT INTO streams (stream_uuid) VALUES ($1) RETURNING stream_id, stream_uuid, stream_version, created_at, deleted_at")
×
116
            .bind(stream_uuid)
117
            .fetch_one(pool)
×
118
            .await?)
×
119
}
×
120

121
fn create_append_indexes(events: usize) -> String {
1✔
122
    (0..events)
1✔
123
        .map(|i| {
2✔
124
            format!(
14✔
125
                "(${}, ${}, ${}, ${}, ${}::jsonb, ${}::jsonb, ${}::timestamp)",
126
                (i * 7) + 1,
2✔
127
                (i * 7) + 2,
2✔
128
                (i * 7) + 3,
2✔
129
                (i * 7) + 4,
2✔
130
                (i * 7) + 5,
2✔
131
                (i * 7) + 6,
2✔
132
                (i * 7) + 7
2✔
133
            )
134
        })
2✔
135
        .collect::<Vec<String>>()
136
        .join(",")
137
}
1✔
138

139
fn create_event_id_stream_version_indexes(starting_at: usize, length: usize) -> String {
×
140
    (0..length)
×
141
        .map(|i| match i {
×
142
            0 => format!("(${}::uuid, ${}::bigint)", starting_at, starting_at + 1),
×
143
            event_number => {
144
                let index = (event_number * 2) + starting_at;
×
145
                format!("(${}, ${})", index, index + 1)
×
146
            }
×
147
        })
×
148
        .collect::<Vec<String>>()
149
        .join(",")
150
}
×
151

152
pub async fn insert_events(
×
153
    conn: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
×
154
    stream_uuid: &str,
×
155
    events: &[UnsavedEvent],
×
156
) -> Result<Vec<Uuid>, sqlx::Error> {
×
157
    let q = format!(
×
158
        r#"
159
WITH
160
  inserted_events AS (
161
    INSERT INTO events (event_id, event_type, causation_id, correlation_id, data, metadata, created_at)
162
    VALUES {} RETURNING *
163
  ),
164

165
  updated_stream AS (
166
    UPDATE
167
      streams
168
    SET
169
      stream_version = stream_version + {event_number}
170
    FROM inserted_events
171
    WHERE
172
      streams.stream_uuid = '{}' RETURNING streams.stream_id,
173
      stream_version,
174
      stream_version - {event_number} AS initial_stream_version,
175
      stream_uuid
176
  ),
177

178
  events_mapping (event_id, stream_version) AS (VALUES {events}),
179

180
  insert_stream_events AS (
181
    INSERT INTO stream_events (event_id, stream_id, stream_version, original_stream_id, original_stream_version)
182
    SELECT events_mapping.event_id, updated_stream.stream_id, events_mapping.stream_version, updated_stream.stream_id, events_mapping.stream_version
183
    FROM events_mapping, updated_stream RETURNING *
184
  )
185

186
SELECT
187
  inserted_events.event_id as event_uuid
188
FROM
189
  inserted_events;
190
  "#,
191
        create_append_indexes(events.len()),
×
192
        &stream_uuid,
×
193
        event_number = events.len(),
×
194
        events = create_event_id_stream_version_indexes(events.len() * 7 + 1, events.len()),
×
195
    );
196

197
    let mut query = sqlx::query(&q);
×
198
    for event in events {
×
199
        query = query
×
200
            .bind(event.event_uuid)
×
201
            .bind(&event.event_type)
×
202
            .bind(event.causation_id)
×
203
            .bind(event.correlation_id)
×
204
            .bind(&event.data)
×
205
            .bind(&event.metadata)
×
206
            .bind(event.created_at);
×
207
    }
208
    for event in events.iter() {
×
209
        query = query
×
210
            .bind(event.event_uuid)
×
211
            .bind(event.stream_version as i64);
×
212
    }
213

214
    query.map(|row: PgRow| row.get(0)).fetch_all(conn).await
×
215
}
×
216

217
#[cfg(test)]
218
mod test {
219

220
    use event_store_core::event::{Event, RecordedEvent, UnsavedEvent};
221
    use serde::{Deserialize, Serialize};
222

223
    use pretty_assertions::assert_eq;
224

225
    use crate::sql::create_append_indexes;
226

227
    #[derive(Serialize, Deserialize)]
2✔
228
    struct MyEvent {}
229
    impl Event for MyEvent {
230
        fn event_type(&self) -> &'static str {
2✔
231
            "MyEvent"
232
        }
2✔
233

234
        fn all_event_types() -> Vec<&'static str> {
×
235
            vec!["MyEvent"]
×
236
        }
×
237
    }
238

239
    impl std::convert::TryFrom<RecordedEvent> for MyEvent {
240
        type Error = ();
241
        fn try_from(e: RecordedEvent) -> Result<Self, Self::Error> {
×
242
            serde_json::from_value(e.data).map_err(|_| ())
×
243
        }
×
244
    }
245

246
    #[test]
247
    fn should_produce_the_right_number_of_arguments() {
2✔
248
        let e = MyEvent {};
249
        let events: Vec<UnsavedEvent> = vec![
2✔
250
            UnsavedEvent::try_from(&e).unwrap(),
1✔
251
            UnsavedEvent::try_from(&e).unwrap(),
1✔
252
        ];
253
        assert_eq!(
1✔
254
            "($1, $2, $3, $4, $5::jsonb, $6::jsonb, $7::timestamp),($8, $9, $10, $11, $12::jsonb, $13::jsonb, $14::timestamp)",
255
            create_append_indexes(events.len())
1✔
256
        );
257
    }
2✔
258
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc