• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolution-gaming / kafka-journal / 11837553250

14 Nov 2024 12:50PM CUT coverage: 83.298% (+0.2%) from 83.145%
11837553250

Pull #697

web-flow
Merge 1de7acaf5 into 916090a2c
Pull Request #697: make `version` non-optional

7 of 10 new or added lines in 5 files covered. (70.0%)

7 existing lines in 7 files now uncovered.

3167 of 3802 relevant lines covered (83.3%)

1.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.14
/journal/src/main/scala/com/evolutiongaming/kafka/journal/eventual/ReplicatedPartitionJournal.scala
1
package com.evolutiongaming.kafka.journal.eventual
2

3
import cats.effect.Resource
4
import cats.effect.syntax.all.*
5
import cats.syntax.all.*
6
import cats.{Applicative, Monad, ~>}
7
import com.evolutiongaming.catshelper.{BracketThrowable, Log, MeasureDuration, MonadThrowable}
8
import com.evolutiongaming.kafka.journal.*
9
import com.evolutiongaming.skafka.{Offset, Partition, Topic}
10

11
import java.time.Instant
12

13
trait ReplicatedPartitionJournal[F[_]] {
14

15
  def offsets: ReplicatedPartitionJournal.Offsets[F]
16

17
  def journal(id: String): Resource[F, ReplicatedKeyJournal[F]]
18
}
19

20
object ReplicatedPartitionJournal {
21

22
  def empty[F[_]: Applicative]: ReplicatedPartitionJournal[F] = {
23
    class Empty
24
    new Empty with ReplicatedPartitionJournal[F] {
×
25

26
      def offsets: Offsets[F] = {
27
        new Empty with Offsets[F] {
×
28

29
          def get = none[Offset].pure[F]
×
30

31
          def create(offset: Offset, timestamp: Instant) = ().pure[F]
×
32

33
          def update(offset: Offset, timestamp: Instant) = ().pure[F]
×
34
        }
35
      }
36

37
      def journal(id: String) = {
38
        ReplicatedKeyJournal
39
          .empty[F]
40
          .pure[F]
×
41
          .toResource
×
42
      }
43
    }
44
  }
45

46
  trait Offsets[F[_]] {
47
    def get: F[Option[Offset]]
48

49
    def create(offset: Offset, timestamp: Instant): F[Unit]
50

51
    def update(offset: Offset, timestamp: Instant): F[Unit]
52
  }
53

54
  private sealed abstract class WithLog
55

56
  private sealed abstract class WithMetrics
57

58
  private sealed abstract class EnhanceError
59

60
  private sealed abstract class MapK
61

62
  implicit class ReplicatedPartitionJournalOps[F[_]](val self: ReplicatedPartitionJournal[F]) extends AnyVal {
63

64
    def mapK[G[_]](f: F ~> G)(implicit B: BracketThrowable[F], GT: BracketThrowable[G]): ReplicatedPartitionJournal[G] = {
65
      new MapK with ReplicatedPartitionJournal[G] {
×
66

67
        def offsets: Offsets[G] = {
68
          new MapK with Offsets[G] {
×
69

70
            def get = f(self.offsets.get)
×
71

72
            def create(offset: Offset, timestamp: Instant) = f(self.offsets.create(offset, timestamp))
×
73

74
            def update(offset: Offset, timestamp: Instant) = f(self.offsets.update(offset, timestamp))
×
75
          }
76
        }
77

78
        def journal(id: String) = {
79
          self
80
            .journal(id)
81
            .map(_.mapK(f))
82
            .mapK(f)
×
83
        }
84
      }
85
    }
86

87
    def withLog(topic: Topic, partition: Partition, log: Log[F])(
88
      implicit F: Monad[F],
89
      measureDuration: MeasureDuration[F],
90
    ): ReplicatedPartitionJournal[F] = {
91
      new WithLog with ReplicatedPartitionJournal[F] {
2✔
92

93
        def offsets: Offsets[F] = {
94
          new WithLog with Offsets[F] {
1✔
95

96
            def get = {
97
              for {
98
                d <- MeasureDuration[F].start
1✔
99
                r <- self.offsets.get
1✔
100
                d <- d
1✔
101
                _ <- log.debug(s"$topic offsets.get in ${d.toMillis}ms, partition: $partition, result: $r")
1✔
102
              } yield r
1✔
103
            }
104

105
            def create(offset: Offset, timestamp: Instant) = {
106
              for {
107
                d <- MeasureDuration[F].start
1✔
108
                r <- self.offsets.create(offset, timestamp)
1✔
109
                d <- d
1✔
110
                _ <- log.debug(
1✔
111
                  s"$topic offsets.create in ${d.toMillis}ms, partition: $partition, offset: $offset, timestamp: $timestamp",
112
                )
113
              } yield r
1✔
114
            }
115

116
            def update(offset: Offset, timestamp: Instant) = {
117
              for {
118
                d <- MeasureDuration[F].start
1✔
119
                r <- self.offsets.update(offset, timestamp)
1✔
120
                d <- d
1✔
121
                _ <- log.debug(
1✔
122
                  s"$topic offsets.update in ${d.toMillis}ms, partition: $partition, offset: $offset, timestamp: $timestamp",
123
                )
124
              } yield r
1✔
125
            }
126
          }
127
        }
128

129
        def journal(id: String) = {
130
          self
131
            .journal(id)
132
            .map { _.withLog(Key(id = id, topic = topic), partition, log) }
2✔
133
        }
134
      }
135
    }
136

137
    def withMetrics(
138
      topic: Topic,
139
      metrics: ReplicatedJournal.Metrics[F],
140
    )(implicit F: Monad[F], measureDuration: MeasureDuration[F]): ReplicatedPartitionJournal[F] = {
141
      new WithMetrics with ReplicatedPartitionJournal[F] {
2✔
142

143
        def offsets: Offsets[F] = {
144
          new WithMetrics with Offsets[F] {
2✔
145

146
            def get: F[Option[Offset]] = {
147
              for {
148
                d <- MeasureDuration[F].start
2✔
149
                r <- self.offsets.get
1✔
150
                d <- d
1✔
151
                _ <- metrics.offsetsGet(d)
1✔
152
              } yield r
1✔
153
            }
154

155
            def create(offset: Offset, timestamp: Instant) = {
156
              for {
157
                d <- MeasureDuration[F].start
1✔
158
                r <- self.offsets.create(offset, timestamp)
1✔
159
                d <- d
1✔
160
                _ <- metrics.offsetsCreate(topic, d)
1✔
161
              } yield r
1✔
162
            }
163

164
            def update(offset: Offset, timestamp: Instant) = {
165
              for {
166
                d <- MeasureDuration[F].start
2✔
167
                r <- self.offsets.update(offset, timestamp)
1✔
168
                d <- d
1✔
169
                _ <- metrics.offsetsUpdate(topic, d)
1✔
170
              } yield r
1✔
171
            }
172
          }
173
        }
174

175
        def journal(id: String) = {
176
          self
177
            .journal(id)
178
            .map { _.withMetrics(topic, metrics) }
1✔
179
        }
180
      }
181
    }
182

183
    def enhanceError(topic: Topic, partition: Partition)(implicit F: MonadThrowable[F]): ReplicatedPartitionJournal[F] = {
184

185
      def journalError(msg: String, cause: Throwable) = {
186
        JournalError(s"ReplicatedPartitionJournal.$msg failed with $cause", cause)
×
187
      }
188

189
      new EnhanceError with ReplicatedPartitionJournal[F] {
2✔
190

191
        def offsets: Offsets[F] = {
192
          new EnhanceError with Offsets[F] {
1✔
193

194
            def get: F[Option[Offset]] = {
195
              self
196
                .offsets
197
                .get
198
                .adaptError { case a => journalError(s"offsets.get topic: $topic, partition: $partition", a) }
1✔
199
            }
200

201
            def create(offset: Offset, timestamp: Instant) = {
202
              self
203
                .offsets
204
                .create(offset, timestamp)
205
                .adaptError {
1✔
206
                  case a =>
207
                    journalError(
208
                      s"offsets.create " +
209
                        s"topic: $topic, " +
210
                        s"partition: $partition, " +
211
                        s"offset: $offset, " +
212
                        s"timestamp: $timestamp",
213
                      a,
214
                    )
215
                }
216
            }
217

218
            def update(offset: Offset, timestamp: Instant) = {
219
              self
220
                .offsets
221
                .update(offset, timestamp)
222
                .adaptError {
1✔
223
                  case a =>
224
                    journalError(
225
                      s"offsets.update " +
226
                        s"topic: $topic, " +
227
                        s"partition: $partition, " +
228
                        s"offset: $offset, " +
229
                        s"timestamp: $timestamp",
230
                      a,
231
                    )
232
                }
233
            }
234
          }
235
        }
236

237
        def journal(id: String) = {
238
          val key = Key(id = id, topic = topic)
2✔
239
          self
240
            .journal(id)
241
            .map { _.enhanceError(key, partition) }
2✔
UNCOV
242
            .adaptError { case a => journalError(s"journal key: $key", a) }
×
243
        }
244
      }
245
    }
246
  }
247
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc