• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolution-gaming / kafka-journal / 14645650020

24 Apr 2025 03:34PM CUT coverage: 94.106% (+0.08%) from 94.024%
14645650020

Pull #760

web-flow
Merge 5b4eadf6e into 350691e40
Pull Request #760: Update pureconfig-cats, pureconfig-core, ... to 0.17.9

5269 of 5599 relevant lines covered (94.11%)

1.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.7
/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicator.scala
1
package com.evolutiongaming.kafka.journal.replicator
2

3
import cats.Monad
4
import cats.data.{NonEmptyList as Nel, NonEmptyMap as Nem, NonEmptySet as Nes}
5
import cats.effect.*
6
import cats.effect.implicits.*
7
import cats.implicits.*
8
import com.evolutiongaming.catshelper.*
9
import com.evolutiongaming.catshelper.ClockHelper.*
10
import com.evolutiongaming.catshelper.ParallelHelper.*
11
import com.evolutiongaming.kafka.journal.*
12
import com.evolutiongaming.kafka.journal.conversions.{ConsRecordToActionRecord, KafkaRead}
13
import com.evolutiongaming.kafka.journal.eventual.*
14
import com.evolutiongaming.kafka.journal.util.Fail
15
import com.evolutiongaming.kafka.journal.util.SkafkaHelper.*
16
import com.evolutiongaming.retry.Sleep
17
import com.evolutiongaming.skafka.*
18
import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig}
19
import scodec.bits.ByteVector
20

21
import java.time.Instant
22
import scala.concurrent.duration.*
23
import scala.util.Try
24

25
/**
26
 * Consumes the Kafka topic and "splits" the data stream into [[PartitionFlow]]s and "splits" each
27
 * per-partition stream in [[KeyFlow]]s.
28
 *
29
 * Basically:
30
 *   - result of each Kafka's `poll` gets grouped per partition and key
1✔
31
 *   - grouped per-key records are processed by [[ReplicateRecords]]
32
 */
33
private[journal] object TopicReplicator {
34

35
  def make[F[_]: Concurrent: Sleep: ToTry: LogOf: Fail: MeasureDuration: JsonCodec](
1✔
36
    topic: Topic,
1✔
37
    journal: ReplicatedJournal[F],
1✔
38
    consumer: Resource[F, TopicConsumer[F]],
39
    metrics: TopicReplicatorMetrics[F],
40
    cacheOf: CacheOf[F],
41
    replicatedOffsetNotifier: ReplicatedOffsetNotifier[F],
42
  ): Resource[F, F[Outcome[F, Throwable, Unit]]] = {
1✔
43

44
    implicit val fromAttempt: FromAttempt[F] = FromAttempt.lift[F]
2✔
45
    implicit val fromJsResult: FromJsResult[F] = FromJsResult.lift[F]
2✔
46
    implicit val jsonCodec: JsonCodec[Try] = JsonCodec.summon[F].mapK(ToTry.functionK)
2✔
47

48
    val kafkaRead = KafkaRead.summon[F, Payload]
2✔
49
    val eventualWrite = EventualWrite.summon[F, Payload]
2✔
50

1✔
51
    def consume(
1✔
52
      consumer: Resource[F, TopicConsumer[F]],
53
      log: Log[F],
1✔
54
    ) = {
1✔
55

1✔
56
      val consRecordToActionRecord = ConsRecordToActionRecord[F]
1✔
57
      of(
2✔
58
        topic = topic,
1✔
59
        consumer = consumer,
60
        consRecordToActionRecord = consRecordToActionRecord,
61
        kafkaRead = kafkaRead,
62
        eventualWrite = eventualWrite,
1✔
63
        journal = journal,
64
        metrics = metrics,
65
        log = log,
66
        cacheOf = cacheOf,
67
        replicatedOffsetNotifier = replicatedOffsetNotifier,
68
      )
69
    }
70

71
    for {
1✔
72
      log <- topicLoggerOf(topic).toResource
1✔
73
      done <- consume(consumer, log).background
2✔
74
    } yield done
75
  }
76

77
  private def topicLoggerOf[F[_]: LogOf: Monad](topic: Topic): F[Log[F]] = {
1✔
78
    LogOf[F].apply(TopicReplicator.getClass).map(_ prefixed topic)
2✔
79
  }
80

81
  def of[F[_]: Concurrent: Sleep: MeasureDuration, A](
82
    topic: Topic,
83
    consumer: Resource[F, TopicConsumer[F]],
84
    consRecordToActionRecord: ConsRecordToActionRecord[F],
85
    kafkaRead: KafkaRead[F, A],
86
    eventualWrite: EventualWrite[F, A],
87
    journal: ReplicatedJournal[F],
88
    metrics: TopicReplicatorMetrics[F],
89
    log: Log[F],
90
    cacheOf: CacheOf[F],
91
    replicatedOffsetNotifier: ReplicatedOffsetNotifier[F],
92
  ): F[Unit] = {
93

94
    trait PartitionFlow {
95
      def apply(timestamp: Instant, records: Nel[ConsRecord]): F[Unit]
96
    }
97

98
    trait KeyFlow {
99
      def apply(timestamp: Instant, records: Nel[ConsRecord]): F[Int]
100
    }
101

102
    val topicFlowOf: TopicFlowOf[F] = { (topic: Topic) =>
103
      {
104
        for {
1✔
105
          journal <- journal.journal(topic)
2✔
106
          cache <- cacheOf[Partition, PartitionFlow](topic)
2✔
107
        } yield {
108

109
          def remove(partitions: Nes[Partition]) = {
1✔
110
            partitions.parFoldMap1 { partition => cache.remove(partition) }
2✔
111
          }
112

113
          new TopicFlow[F] {
1✔
114

115
            def assign(partitions: Nes[Partition]) = {
116
              log.info(s"assign ${ partitions.mkString_(",") }")
1✔
117
            }
118

119
            def apply(records: Nem[Partition, Nel[ConsRecord]]) = {
1✔
120
              for {
1✔
121
                duration <- MeasureDuration[F].start
2✔
122
                timestamp <- Clock[F].instant
2✔
123
                _ <- records.parFoldMap1 {
2✔
124
                  case (partition, records) =>
1✔
125
                    for {
1✔
126
                      partitionFlow <- cache.getOrUpdate(partition) {
2✔
127
                        for {
1✔
128
                          journal <- journal(partition)
2✔
129
                          offsets = journal.offsets
2✔
130
                          offsetRef <- Resource.eval {
2✔
131
                            for {
1✔
132
                              offset <- offsets.get
2✔
133
                              ref <- Ref.of(offset)
2✔
134
                            } yield ref
135
                          }
136
                          cache <- cacheOf[String, KeyFlow](topic)
2✔
137
                        } yield { (timestamp: Instant, records: Nel[ConsRecord]) =>
138
                          {
139
                            for {
1✔
140
                              offset <- offsetRef.get
2✔
141
                              _ <- offset
2✔
142
                                .fold {
143
                                  records.some
2✔
144
                                } { offset =>
2✔
145
                                  records.filter { _.offset > offset }.toNel
2✔
146
                                }
147
                                .foldMapM { records =>
1✔
148
                                  records
1✔
149
                                    .groupBy { _.key.map { _.value } }
2✔
150
                                    .parFoldMap1 {
1✔
151
                                      case (key, records) =>
1✔
152
                                        key.foldMapM { key =>
2✔
153
                                          for {
1✔
154
                                            keyFlow <- cache.getOrUpdate(key) {
2✔
155
                                              journal
1✔
156
                                                .journal(key)
1✔
157
                                                .map { journal =>
2✔
158
                                                  val replicateRecords = ReplicateRecords(
2✔
159
                                                    consRecordToActionRecord,
1✔
160
                                                    journal,
1✔
161
                                                    metrics,
1✔
162
                                                    kafkaRead,
1✔
163
                                                    eventualWrite,
1✔
164
                                                    log,
1✔
165
                                                  )
1✔
166
                                                  (timestamp: Instant, records: Nel[ConsRecord]) => {
167
                                                    replicateRecords(records, timestamp)
2✔
168
                                                  }
169
                                                }
1✔
170
                                            }
1✔
171
                                            result <- keyFlow(timestamp, records)
2✔
172
                                          } yield result
1✔
173
                                        }
1✔
174
                                    }
1✔
175
                                }
1✔
176
                              result <- {
2✔
177
                                val offset1 = records.maximumBy { _.offset }.offset
2✔
178

1✔
179
                                def setAndNotify = offsetRef.set(offset1.some) >>
1✔
180
                                  replicatedOffsetNotifier.onReplicatedOffset(TopicPartition(topic, partition), offset1)
1✔
181

182
                                offset.fold {
1✔
183
                                  for {
1✔
184
                                    a <- offsets.create(offset1, timestamp)
2✔
185
                                    _ <- setAndNotify
2✔
186
                                  } yield a
2✔
187
                                } { offset =>
2✔
188
                                  if (offset1 > offset) {
2✔
189
                                    for {
1✔
190
                                      a <- offsets.update(offset1, timestamp)
2✔
191
                                      _ <- setAndNotify
2✔
192
                                    } yield a
2✔
193
                                  } else {
1✔
194
                                    ().pure[F]
2✔
195
                                  }
196
                                }
197
                              }
198
                            } yield result
199
                          }
200
                        }
1✔
201
                      }
1✔
202
                      result <- partitionFlow(timestamp, records)
2✔
203
                    } yield result
1✔
204
                }
1✔
205
                duration <- duration
2✔
206
                _ <- metrics.round(duration, records.foldLeft(0) { _ + _.size })
2✔
207
              } yield {
1✔
208
                records.map { records =>
2✔
209
                  records.foldLeft { Offset.min } { (offset, record) =>
2✔
210
                    record
1✔
211
                      .offset
2✔
212
                      .inc[Try]
2✔
213
                      .fold(_ => offset, _ max offset)
1✔
214
                  }
215
                }.toSortedMap
2✔
216
              }
1✔
217
            }
1✔
218

219
            def revoke(partitions: Nes[Partition]) = {
1✔
220
              for {
1✔
221
                _ <- log.info(s"revoke ${ partitions.mkString_(",") }")
2✔
222
                a <- remove(partitions)
2✔
223
              } yield a
224
            }
1✔
225

1✔
226
            def lose(partitions: Nes[Partition]) = {
1✔
227
              for {
1✔
228
                _ <- log.info(s"lose ${ partitions.mkString_(",") }")
×
229
                a <- remove(partitions)
×
230
              } yield a
1✔
231
            }
1✔
232
          }
233
        }
1✔
234
      }
1✔
235
    }
1✔
236

237
    ConsumeTopic(topic, consumer, topicFlowOf, log)
2✔
238
  }
239

240
  object ConsumerOf {
241

242
    def make[F[_]: Temporal: KafkaConsumerOf: FromTry: LogOf](
1✔
243
      topic: Topic,
244
      config: ConsumerConfig,
245
      pollTimeout: FiniteDuration,
1✔
246
      hostName: Option[HostName],
1✔
247
    ): Resource[F, TopicConsumer[F]] = {
1✔
248

249
      val groupId = {
250
        val prefix = config.groupId getOrElse "replicator"
2✔
251
        s"$prefix-$topic"
1✔
252
      }
1✔
253

254
      val common = config.common
1✔
255

256
      val clientId = {
257
        val clientId = common.clientId getOrElse "replicator"
2✔
258
        hostName.fold(clientId) { hostName => s"$clientId-$hostName" }
2✔
259
      }
260

261
      val config1 = config.copy(
1✔
262
        common = common.copy(clientId = clientId.some),
2✔
263
        groupId = groupId.some,
2✔
264
        autoOffsetReset = AutoOffsetReset.Earliest,
1✔
265
        autoCommit = false,
1✔
266
      )
267

268
      for {
1✔
269
        consumer <- KafkaConsumerOf[F].apply[String, ByteVector](config1)
2✔
270
        log <- topicLoggerOf[F](topic).toResource
2✔
271
        metadata = hostName.fold { Metadata.empty } { _.value }
2✔
272
        commit <- TopicCommit.asyncPeriodic(
2✔
273
          topic = topic,
274
          commitMetadata = metadata,
275
          commitPeriod = 5.seconds,
2✔
276
          consumer = consumer,
277
          log = log,
278
        )
279
      } yield {
280
        TopicConsumer(topic, pollTimeout, commit, consumer)
2✔
281
      }
282
    }
283
  }
284
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc