• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolution-gaming / kafka-journal / 11837553250

14 Nov 2024 12:50PM CUT coverage: 83.298% (+0.2%) from 83.145%
11837553250

Pull #697

web-flow
Merge 1de7acaf5 into 916090a2c
Pull Request #697: make `version` non-optional

7 of 10 new or added lines in 5 files covered. (70.0%)

7 existing lines in 7 files now uncovered.

3167 of 3802 relevant lines covered (83.3%)

1.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.83
/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala
1
package com.evolutiongaming.kafka.journal.replicator
2

3
import cats.data.NonEmptyList
4
import cats.syntax.all.*
5
import com.evolutiongaming.kafka.journal.*
6
import com.evolutiongaming.skafka.Offset
7

8
/**
9
 * Receives list of records from [[ReplicateRecords]], groups and optimizes similar or sequential actions, like:
10
 *  - two or more `append` or `delete` actions are merged into one
11
 *  - optimizes list of records to minimize load on Cassandra, like:
12
 *    - aggregate effective actions by processing them from the _youngest_ record down to oldest
13
 *    - ignore/drop all actions, which are before last `Purge`
14
 *    - ignore all `Mark` actions
15
 *    - if `append`(s) are followed by `delete` all `append`(s), except last, are dropped
16
 *  Our goal is to minimize load on Cassandra while preserving original offset ordering.
17
 */
18
private[journal] sealed abstract class Batch extends Product {
19

20
  def offset: Offset
21
}
22

23
private[journal] object Batch {
24

25
  def of(records: NonEmptyList[ActionRecord[Action]]): List[Batch] = {
26
    // reverse list of records to process them from the youngest record down to oldest
27
    records
28
      .reverse
29
      .foldLeft { State.empty } { State.fold }
1✔
30
      .batches
1✔
31
  }
32

33
  /**
34
    * Internal state used to collapse actions (aka Kafka records) into batches.
35
    * Each batch represents an action to be applied to Cassandra: `appends`, `delete` or `purge`.
36
    *
37
    * @param batches list of batches, where head is the _oldest_ batch
38
    */
39
  private case class State(batches: List[Batch]) {
40

41
    /**
42
      * Oldest batch from the state.
43
      * 
44
      * Actions are processed from the _youngest_ record down to oldest, so `state.next` on each step represents batch that follows the current action:
45
      * {{{
46
      * // a<x> - action #x
47
      * // b<x> - batch #x
48
      * [a1,a2,a3,a4,a5,a6,a7] // actions
49
      * [b6,b7] // state.batches after processing actions a7 & a6
50
      * state.next == b6 // while processing a5
51
      * }}}  
52
      */
53
    def next: Option[Batch] = batches.headOption
1✔
54

55
    /**
56
      * Find _oldest_ delete batch in the state.
57
      */
58
    def delete: Option[Delete] = batches.collectFirst { case d: Delete => d }
1✔
59

60
    /**
61
      * Add new batch to the state as the _oldest_ one.
62
      */
63
    def prepend(batch: Batch): State = new State(batch :: batches)
1✔
64

65
    /**
66
      * Replace _oldest_ batch in the state with new one.
67
      */
68
    def replace(batch: Batch): State = new State(batch :: batches.tail)
1✔
69

70
  }
71

72
  private object State {
73

74
    val empty: State = State(List.empty)
1✔
75

76
    def fold(state: State, event: ActionRecord[Action]): State = event match {
77

78
      case ActionRecord(_: Action.Mark, _) => state
2✔
79

80
      case ActionRecord(purge: Action.Purge, partitionOffset: PartitionOffset) =>
2✔
81
        def purgeBatch = Purge(partitionOffset.offset, purge.origin, purge.version)
2✔
82

83
        state.next match {
2✔
84
          case Some(_: Purge) => state
1✔
85
          case Some(_)        => state.prepend(purgeBatch)
1✔
86
          case None           => state.prepend(purgeBatch)
2✔
87
        }
88

89
      case ActionRecord(delete: Action.Delete, partitionOffset: PartitionOffset) =>
2✔
90
        def deleteBatch(delete: Action.Delete) =
91
          Delete(partitionOffset.offset, delete.to, delete.origin, delete.version)
2✔
92

93
        state.next match {
2✔
94

95
          case Some(_: Purge) => state
2✔
96
          case None           => state.prepend(deleteBatch(delete))
2✔
97

98
          // Action is `delete` and next batch is `appends`.
99
          // Can be that another `delete`, that also deletes same (or more) as incoming `delete` action,
100
          // present in state's batches - then ignore incoming `delete`.
101
          case Some(_: Appends) =>
1✔
102
            // If `delete` included in `state.delete` then ignore it
103
            val delete1 = state.delete match {
1✔
104
              case None          => delete.some
1✔
UNCOV
105
              case Some(delete1) => if (delete1.to.value < delete.to.value) delete.some else None
×
106
            }
107
            delete1 match {
108
              case Some(delete) => state.prepend(deleteBatch(delete))
1✔
109
              case None         => state
1✔
110
            }
111

112
          case Some(next: Delete) =>
113
            if (delete.header.to.value < next.to.value) state
1✔
114
            // If `delete` includes `next` then replace `next` with `delete`
115
            else state.replace(deleteBatch(delete))
1✔
116
        }
117

118
      case ActionRecord(append: Action.Append, partitionOffset: PartitionOffset) =>
119
        state.next match {
1✔
120

121
          case Some(_: Purge) => state
1✔
122

123
          case Some(next: Appends) =>
1✔
124
            val append1 =
125
              // if `append` deleted by `state.delete` then ignore it
126
              state.delete match {
1✔
127
                case Some(delete) => if (delete.to.value < append.range.to) append.some else None
1✔
128
                case None         => append.some
1✔
129
              }
130

131
            append1 match {
132
              case Some(append) =>
1✔
133
                val record  = ActionRecord(append, partitionOffset)
1✔
134
                val appends = Appends(next.offset, record :: next.records)
1✔
135
                // replace head (aka [state.next]) with new Appends, i.e. merge `append` with `next`
136
                state.replace(appends)
1✔
137

138
              case None => state
1✔
139
            }
140

141
          case Some(next: Delete) =>
2✔
142
            val record  = ActionRecord(append, partitionOffset)
2✔
143
            val appends = Appends(partitionOffset.offset, NonEmptyList.one(record))
2✔
144
            state.prepend(appends)
2✔
145

146
          case None =>
1✔
147
            val record  = ActionRecord(append, partitionOffset)
1✔
148
            val appends = Appends(partitionOffset.offset, NonEmptyList.one(record))
1✔
149
            state.prepend(appends)
1✔
150
        }
151
    }
152

153
  }
154

155
  final case class Appends(
156
    offset: Offset,
157
    records: NonEmptyList[ActionRecord[Action.Append]],
158
  ) extends Batch
159

160
  final case class Delete(
161
    offset: Offset,
162
    to: DeleteTo,
163
    origin: Option[Origin],
164
    version: Version,
165
  ) extends Batch
166

167
  final case class Purge(
168
    offset: Offset,
169
    origin: Option[Origin], // used only for logging
170
    version: Version, // used only for logging
171
  ) extends Batch
172
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc