• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolution-gaming / kafka-journal / 11837553250

14 Nov 2024 12:50PM CUT coverage: 83.298% (+0.2%) from 83.145%
11837553250

Pull #697

web-flow
Merge 1de7acaf5 into 916090a2c
Pull Request #697: make `version` non-optional

7 of 10 new or added lines in 5 files covered. (70.0%)

7 existing lines in 7 files now uncovered.

3167 of 3802 relevant lines covered (83.3%)

1.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.48
/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicateRecords.scala
1
package com.evolutiongaming.kafka.journal.replicator
2

3
import cats.data.NonEmptyList as Nel
4
import cats.effect.*
5
import cats.syntax.all.*
6
import com.evolutiongaming.catshelper.ClockHelper.*
7
import com.evolutiongaming.catshelper.{BracketThrowable, Log}
8
import com.evolutiongaming.kafka.journal.*
9
import com.evolutiongaming.kafka.journal.conversions.{ConsRecordToActionRecord, KafkaRead}
10
import com.evolutiongaming.kafka.journal.eventual.*
11
import com.evolutiongaming.kafka.journal.util.TemporalHelper.*
12
import com.evolutiongaming.skafka.Offset
13

14
import java.time.Instant
15
import scala.concurrent.duration.FiniteDuration
16

17
/**
18
 * Gets a list of per-key records from [[TopicReplicator]], groups them in [[Batch]]es and replicates each batch
19
 * to Cassandra using [[ReplicatedKeyJournal]].
20
 */
21
private[journal] trait ReplicateRecords[F[_]] {
22

23
  def apply(records: Nel[ConsRecord], timestamp: Instant): F[Int]
24
}
25

26
private[journal] object ReplicateRecords {
27

28
  def apply[F[_]: BracketThrowable: Clock, A](
29
    consRecordToActionRecord: ConsRecordToActionRecord[F],
30
    journal: ReplicatedKeyJournal[F],
31
    metrics: TopicReplicatorMetrics[F],
32
    kafkaRead: KafkaRead[F, A],
33
    eventualWrite: EventualWrite[F, A],
34
    log: Log[F],
35
  ): ReplicateRecords[F] = { (records: Nel[ConsRecord], timestamp: Instant) =>
36
    {
37

38
      def apply(records: Nel[ActionRecord[Action]]): F[Int] = {
39
        val record    = records.last
1✔
40
        val key       = record.action.key
1✔
41
        val partition = record.partitionOffset.partition
1✔
42
        val id        = key.id
1✔
43

44
        def measurements(records: Int): F[TopicReplicatorMetrics.Measurements] = {
45
          for {
46
            now <- Clock[F].instant
1✔
47
          } yield {
48
            val timestamp1 = record.action.timestamp
1✔
49
            TopicReplicatorMetrics.Measurements(
1✔
50
              replicationLatency = now diff timestamp1,
1✔
51
              deliveryLatency    = timestamp diff timestamp1,
1✔
52
              records            = records,
53
            )
54
          }
55
        }
56

57
        def append(offset: Offset, records: Nel[ActionRecord[Action.Append]]): F[Int] = {
58
          val bytes = records.foldLeft(0L) { case (bytes, record) => bytes + record.action.payload.size }
1✔
59

60
          def msg(
61
            events: Nel[EventRecord[EventualPayloadAndType]],
62
            latency: FiniteDuration,
63
            expireAfter: Option[ExpireAfter],
64
          ): String = {
65
            val seqNrs =
66
              if (events.tail.isEmpty) s"seqNr: ${events.head.seqNr}"
×
67
              else s"seqNrs: ${events.head.seqNr}..${events.last.seqNr}"
×
68
            val origin         = records.head.action.origin
×
69
            val originStr      = origin.foldMap { origin => s", origin: $origin" }
×
70
            val version        = records.last.action.version
×
71
            val expireAfterStr = expireAfter.foldMap { expireAfter => s", expireAfter: $expireAfter" }
×
72
            s"append in ${latency.toMillis}ms, id: $id, partition: $partition, offset: $offset, $seqNrs$originStr, version: $version$expireAfterStr"
73
          }
74

75
          def measure(events: Nel[EventRecord[EventualPayloadAndType]], expireAfter: Option[ExpireAfter]): F[Unit] = {
76
            for {
77
              measurements <- measurements(records.size)
1✔
78
              version       = events.last.version
1✔
79
              expiration    = expireAfter.map(_.value.toString).getOrElse("none")
1✔
80
              result <- metrics.append(
1✔
81
                events        = events.length,
1✔
82
                bytes         = bytes,
83
                clientVersion = version.value,
1✔
84
                expiration    = expiration,
85
                measurements  = measurements,
86
              )
87
              _ <- log.info(msg(events, measurements.replicationLatency, expireAfter))
1✔
88
            } yield result
1✔
89
          }
90

91
          for {
92
            events <- records.flatTraverse { record =>
1✔
93
              val action         = record.action
1✔
94
              val payloadAndType = action.toPayloadAndType
1✔
95
              for {
96
                events <- kafkaRead(payloadAndType).adaptError {
1✔
97
                  case e =>
98
                    JournalError(s"ReplicateRecords failed for id: $id, partition: $partition, offset: $offset: $e", e)
99
                }
100
                eventualEvents <- events.events.traverse { _.traverse { a => eventualWrite(a) } }
1✔
101
              } yield for {
102
                event <- eventualEvents
1✔
103
              } yield {
104
                EventRecord(record, event, events.metadata)
1✔
105
              }
106
            }
107
            expireAfter = events.last.metadata.payload.expireAfter
1✔
108
            result     <- journal.append(offset, timestamp, expireAfter, events)
1✔
109
            result     <- if (result) measure(events, expireAfter).as(events.size) else 0.pure[F]
1✔
110
          } yield result
111
        }
112

113
        def delete(offset: Offset, deleteTo: DeleteTo, origin: Option[Origin], version: Version): F[Int] = {
114

115
          def msg(latency: FiniteDuration): String = {
NEW
116
            val originStr = origin.foldMap { origin => s", origin: $origin" }
×
117
            s"delete in ${latency.toMillis}ms, id: $id, offset: $partition:$offset, deleteTo: $deleteTo$originStr, version: $version"
118
          }
119

120
          def measure(): F[Unit] = {
121
            for {
122
              measurements <- measurements(1)
1✔
123
              latency       = measurements.replicationLatency
1✔
124
              _            <- metrics.delete(measurements)
1✔
UNCOV
125
              result       <- log.info(msg(latency))
×
126
            } yield result
127
          }
128

129
          for {
130
            result <- journal.delete(offset, timestamp, deleteTo, origin)
1✔
131
            result <- if (result) measure().as(1) else 0.pure[F]
1✔
132
          } yield result
133
        }
134

135
        def purge(offset: Offset, origin: Option[Origin], version: Version): F[Int] = {
136

137
          def msg(latency: FiniteDuration): String = {
NEW
138
            val originStr = origin.foldMap { origin => s", origin: $origin" }
×
139
            s"purge in ${latency.toMillis}ms, id: $id, offset: $partition:$offset$originStr, version: $version"
140
          }
141

142
          def measure(): F[Unit] = {
143
            for {
144
              measurements <- measurements(1)
1✔
145
              latency       = measurements.replicationLatency
1✔
146
              _            <- metrics.purge(measurements)
1✔
147
              result       <- log.info(msg(latency))
1✔
148
            } yield result
149
          }
150

151
          for {
152
            result <- journal.purge(offset, timestamp)
1✔
153
            result <- if (result) measure().as(1) else 0.pure[F]
1✔
154
          } yield result
155
        }
156

157
        Batch
158
          .of(records)
1✔
159
          .foldMapM {
1✔
160
            case batch: Batch.Appends => append(batch.offset, batch.records)
1✔
161
            case batch: Batch.Delete  => delete(batch.offset, batch.to, batch.origin, batch.version)
1✔
162
            case batch: Batch.Purge   => purge(batch.offset, batch.origin, batch.version)
1✔
163
          }
164
      }
165

166
      for {
167
        records <- records
1✔
168
          .toList
169
          .traverseFilter { record => consRecordToActionRecord(record) }
1✔
170
        result <- records
1✔
171
          .toNel
1✔
172
          .foldMapM { records => apply(records) }
1✔
173
      } yield result
174
    }
175
  }
176
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc