• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolution-gaming / kafka-journal / 11837553250

14 Nov 2024 12:50PM CUT coverage: 83.298% (+0.2%) from 83.145%
11837553250

Pull #697

web-flow
Merge 1de7acaf5 into 916090a2c
Pull Request #697: make `version` non-optional

7 of 10 new or added lines in 5 files covered. (70.0%)

7 existing lines in 7 files now uncovered.

3167 of 3802 relevant lines covered (83.3%)

1.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.08
/journal/src/main/scala/com/evolutiongaming/kafka/journal/HeadInfo.scala
1
package com.evolutiongaming.kafka.journal
2

3
import cats.syntax.all.*
4
import cats.{Foldable, Semigroup}
5
import com.evolutiongaming.skafka.Offset
6

7
/** Minimal metainformation about the non-replicated events in a journal.
8
  *
9
  * If journal has a non-deleted events pending a replication, it will contain
10
  * [[Offset]] of the _first_ (i.e. oldest) event and [[SeqNr]] of the _last_
11
  * (i.e. newest) event in a form of [[HeadInfo.Append]] class, with
12
  * [[HeadInfo.Append#deleteTo]] specifying the part to be deleted, if any.
13
  *
14
  * If the journal was purged or all events were deleted, it will be
15
  * [[HeadInfo.Purge]] or [[HeadInfo.Delete]], accordingly.
16
  *
17
  * Having this information (by preparing it using [[HeadCache]]) allows one to
18
  * avoid re-reading Kafka during recovery, if it is [[HeadInfo.Empty]], or
19
  * start reading Kafka topic partition much later, by skipping events unrelated
20
  * to the journal.
21
  *
22
  * @see
23
  *   [[HeadCache]] for more details on how this information is being
24
  *   prefetched.
25
  * @see
26
  *   [[Journals]] for more details on how it is used.
27
  */
28
private[journal] sealed abstract class HeadInfo extends Product
29

30
private[journal] object HeadInfo {
31

32
  /** There are no non-replicated events in Kafka for specific journal.
33
    *
34
    * [[HeadInfo#empty]] is equivalent to [[HeadInfo.Empty]], but the inferred
35
    * type will be `HeadInfo` rather than `HeadInfo.Empty`, which might be a
36
    * little more convenient in some cases.
37
    *
38
    * Example:
39
    * {{{
40
    * scala> import com.evolutiongaming.kafka.journal.HeadInfo
41
    *
42
    * scala> HeadInfo.Empty
43
    * val res0: HeadInfo.Empty.type = Empty
44
    *
45
    * scala> HeadInfo.empty
46
    * val res1: HeadInfo = Empty
47
    * }}}
48
    *
49
    * @see [[HeadInfo.Empty]] for more details.
50
    */
51
  def empty: HeadInfo = Empty
1✔
52

53
  /** The only non-replicated records are delete actions.
54
    *
55
    * [[HeadInfo#delete]] is equivalent to [[HeadInfo.Delete]], but the inferred
56
    * type will be `HeadInfo` rather than `HeadInfo.Delete`, which might be a
57
    * little more convenient in some cases.
58
    *
59
    * Example:
60
    * {{{
61
    * scala> import com.evolutiongaming.kafka.journal.HeadInfo
62
    *
63
    * scala> HeadInfo.Delete(DeleteTo(SeqNr.min))
64
    * val res1: HeadInfo.Delete = Delete(1)
65
    *
66
    * scala> HeadInfo.delete(DeleteTo(SeqNr.min))
67
    * val res0: HeadInfo = Delete(1)
68
    * }}}
69
    *
70
    * @see [[HeadInfo.Delete]] for more details.
71
    */
72
  def delete(deleteTo: DeleteTo): HeadInfo = Delete(deleteTo)
1✔
73

74
  /** There are new appended events in Kafka, which did not replicate yet.
75
    *
76
    * [[HeadInfo#append]] is equivalent to [[HeadInfo.Append]], but the inferred
77
    * type will be `HeadInfo` rather than `HeadInfo.Append`, which might be a
78
    * little more convenient in some cases.
79
    *
80
    * Example:
81
    * {{{
82
    * scala> import com.evolutiongaming.kafka.journal.*
83
    * scala> import com.evolutiongaming.skafka.Offset
84
    *
85
    * scala> HeadInfo.Append(Offset.min, SeqNr.min, None)
86
    * val res0: HeadInfo.Append = Append(0,1,None)
87
    *
88
    * scala> HeadInfo.append(Offset.min, SeqNr.min, None)
89
    * val res1: HeadInfo = Append(0,1,None)
90
    * }}}
91
    *
92
    * @see [[HeadInfo.Append]] for more details.
93
    */
94
  def append(offset: Offset, seqNr: SeqNr, deleteTo: Option[DeleteTo]): HeadInfo = {
95
    Append(offset, seqNr, deleteTo)
1✔
96
  }
97

98
  /** Calls [[HeadInfo#apply]] until all incoming actions are handled */
99
  def apply[F[_]: Foldable](actions: F[(ActionHeader, Offset)]): HeadInfo = {
100
    actions.foldLeft(empty) { case (head, (header, offset)) => head(header, offset) }
×
101
  }
102

103
  /** There are no non-replicated events in Kafka for specific journal.
104
    *
105
    * Having this state means it is safe to assume the journal is fully
106
    * replicated to a storage (i.e. Cassandra).
107
    *
108
    * In other words, one can read it from Cassandra only and not look into
109
    * Kafka.
110
    */
111
  final case object Empty extends HeadInfo
112

113
  /** There are new non-replicated events in Kafka for specific journal.
114
    *
115
    * Having this state means it is not enough to read Cassandra to get a full
116
    * journal. One has to read Kafka also.
117
    */
118
  abstract sealed class NonEmpty extends HeadInfo
119

120
  object NonEmpty {
121

122
    implicit val semigroupNonEmpty: Semigroup[NonEmpty] = { (a: NonEmpty, b: NonEmpty) =>
123
      {
124

125
        def onDelete(a: Append, b: Delete) = {
126
          val deleteTo = a.deleteTo.fold(b.deleteTo) { _ max b.deleteTo }
×
127
          Append(a.offset, a.seqNr, deleteTo.some)
1✔
128
        }
129

130
        (a, b) match {
131
          case (a: Append, b: Append) => Append(a.offset min b.offset, a.seqNr max b.seqNr, a.deleteTo max b.deleteTo)
1✔
132
          case (a: Append, b: Delete) => onDelete(a, b)
1✔
133
          case (_: Append, Purge)     => Purge
1✔
134
          case (a: Delete, b: Append) => onDelete(b, a)
×
135
          case (a: Delete, b: Delete) => Delete(a.deleteTo max b.deleteTo)
×
UNCOV
136
          case (_: Delete, Purge)     => Purge
×
137
          case (Purge, b: Append)     => b
138
          case (Purge, b: Delete)     => b
139
          case (Purge, Purge)         => Purge
×
140
        }
141
      }
142
    }
143
  }
144

145
  /** There are new appended events in Kafka, which did not replicate yet.
146
    *
147
    * The fields will be located like following inside of the Kafka topic
148
    * partition:
149
    * {{{
150
    * [offset][deleted events][deleteTo][non-replicated events][seqNr]
151
    * }}}
152
    *
153
    * @param offset
154
    *   [[Offset]] of the _first_ non-replicated event, which might be,
155
    *   optionally, deleted.
156
    * @param seqNr
157
    *   [[SeqNr]] of the _last_ non-replicated event.
158
    * @param deleteTo
159
    *   [[SeqNr]] of the _last_ deleted event, if any.
160
    */
161
  final case class Append(
162
    offset: Offset,
163
    seqNr: SeqNr,
164
    deleteTo: Option[DeleteTo],
165
  ) extends NonEmpty
166

167
  /** The only non-replicated records are delete actions.
168
    *
169
    * The events itself already replicated to Cassandra, so it should be enough
170
    * to read them from Cassandra starting from the first non-deleted event,
171
    * i.e. there is no need to read Kafka in this case.
172
    *
173
    * The `deleteTo` field will be located like following inside of the Kafka
174
    * topic partition:
175
    * {{{
176
    * [deleted events][deleteTo][replicated events]
177
    * }}}
178
    *
179
    * @param deleteTo
180
    *   [[SeqNr]] of the _last_ deleted event.
181
    */
182
  final case class Delete(
183
    deleteTo: DeleteTo,
184
  ) extends NonEmpty
185

186
  /** The last non-replicated record was a journal purge action.
187
    *
188
    * It means there is no need to read either Cassandra or Kafka as journal was
189
    * purged.
190
    */
191
  final case object Purge extends NonEmpty
192

193
  implicit class HeadInfoOps(val self: HeadInfo) extends AnyVal {
194

195
    /** A journal tail was dropped.
196
      *
197
      * We update the range of deleted non-replicated events.
198
      */
199
    def delete(to: DeleteTo): HeadInfo = {
200

201
      def onAppend(self: Append) = {
202
        val deleteTo = self
203
          .deleteTo
204
          .fold(to) { _ max to }
1✔
205
          .min(self.seqNr.toDeleteTo)
2✔
206
        self.copy(seqNr = self.seqNr, deleteTo = deleteTo.some)
2✔
207
      }
208

209
      def delete = Delete(to)
2✔
210

211
      self match {
2✔
212
        case a: Append => onAppend(a)
2✔
213
        case a: Delete => Delete(a.deleteTo max to)
1✔
214
        case Empty     => delete
2✔
215
        case Purge     => delete
×
216
      }
217
    }
218

219
    /** A new event was appended.
220
      *
221
      * We update last saved [[SeqNr]], but ignore [[Offset]] unless this is a
222
      * first event in our list of non-replicted events.
223
      */
224
    def append(range: SeqRange, offset: Offset): HeadInfo = {
225

226
      def deleteToOf(deleteTo: DeleteTo) = range
227
        .from
1✔
228
        .prev[Option]
1✔
229
        .map { _.toDeleteTo min deleteTo }
1✔
230

231
      def append = Append(offset, range.to, none)
2✔
232

233
      self match {
2✔
234
        case a: Append => a.copy(seqNr = range.to)
2✔
235
        case a: Delete => Append(offset, range.to, deleteToOf(a.deleteTo))
1✔
236
        case Empty     => append
2✔
237
        case Purge     => append
1✔
238
      }
239
    }
240

241
    /** A journal marker was found, which does not change [[HeadInfo]] state.
242
      *
243
      * The marker only matters during recovery, to validate that all journal
244
      * events were read. There is no, currently, a reason to track if markers
245
      * made by previous recoveries did replicate.
246
      */
247
    def mark: HeadInfo = self
2✔
248

249
    /** A journal purge was performed, all previous events could be ignored */
250
    def purge: HeadInfo = Purge
2✔
251

252
    /** Update [[HeadInfo]] with a next event coming from Kafka.
253
      *
254
      * Note, that no actual event body is required, only [[ActionHeader]] and
255
      * [[Offset]] of the new record.
256
      */
257
    def apply(header: ActionHeader, offset: Offset): HeadInfo = {
258
      header match {
259
        case a: ActionHeader.Append => append(a.range, offset)
2✔
260
        case _: ActionHeader.Mark   => mark
2✔
261
        case a: ActionHeader.Delete => delete(a.to)
2✔
262
        case _: ActionHeader.Purge  => purge
2✔
263
      }
264
    }
265
  }
266
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc