• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TouK / nussknacker / 5976637142

25 Aug 2023 01:43PM UTC coverage: 81.47% (+0.03%) from 81.438%
5976637142

push

github

Filemon279
Fix migration in 1.11

25 of 25 new or added lines in 2 files covered. (100.0%)

14865 of 18246 relevant lines covered (81.47%)

5.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.67
/utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaUtils.scala
1
package pl.touk.nussknacker.engine.kafka
2

3
import com.typesafe.scalalogging.LazyLogging
4
import org.apache.kafka.clients.KafkaClient
5
import org.apache.kafka.clients.admin.{Admin, AdminClient}
6
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
7
import org.apache.kafka.clients.producer.{Callback, Producer, ProducerRecord, RecordMetadata}
8
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
9
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
10
import pl.touk.nussknacker.engine.util.ThreadUtils
11

12
import java.time
13
import java.util.concurrent.TimeUnit
14
import java.util.{Collections, Properties}
15
import scala.collection.mutable.ArrayBuffer
16
import scala.concurrent.duration.Duration
17
import scala.concurrent.{Await, Future, Promise}
18
import scala.util.Using.Releasable
19
import scala.util.{Failure, Success, Using}
20

21
object KafkaUtils extends KafkaUtils
22

23
trait KafkaUtils extends LazyLogging {
24

25
  import scala.concurrent.ExecutionContext.Implicits.global
26
  import scala.jdk.CollectionConverters._
27

28
  val defaultTimeoutMillis = 10000
8✔
29

30
  def setClientId(props: Properties, id: String): Unit = {
31
    props.setProperty("client.id", sanitizeClientId(id))
2✔
32
  }
33

34
  def createKafkaAdminClient(kafkaConfig: KafkaConfig): Admin = {
35
    AdminClient.create(withPropertiesFromConfig(new Properties, kafkaConfig))
6✔
36
  }
37

38
  def usingAdminClient[T](kafkaConfig: KafkaConfig)(adminClientOperation: Admin => T): T = {
39
    //we don't use default close not to block indefinitely
40
    val releasable = new Releasable[Admin] {
4✔
41
      override def release(resource: Admin): Unit = resource.close(time.Duration.ofMillis(defaultTimeoutMillis))
4✔
42
    }
43
    Using.resource(createKafkaAdminClient(kafkaConfig))(adminClientOperation)(releasable)
4✔
44
  }
45

46
  def sanitizeClientId(originalId: String): String =
47
    //https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/common/Config.scala#L25-L35
48
    originalId.replaceAll("[^a-zA-Z0-9\\._\\-]", "_")
4✔
49

50
  def setToLatestOffsetIfNeeded(config: KafkaConfig, topic: String, consumerGroupId: String): Unit = {
51
    val setToLatestOffset = config.forceLatestRead.contains(true)
8✔
52
    if (setToLatestOffset) {
8✔
53
      KafkaUtils.setOffsetToLatest(topic, consumerGroupId, config)
×
54
    }
55
  }
56

57
  def setOffsetToLatest(topic: String, groupId: String, config: KafkaConfig): Unit = {
58
    val timeoutMillis = readTimeoutForTempConsumer(config)
×
59
    logger.info(s"Setting offset to latest for topic: $topic, groupId: $groupId")
60
    val consumerAfterWork = Future {
×
61
      doWithTempKafkaConsumer(config, Some(groupId)) { consumer =>
×
62
        setOffsetToLatest(topic, consumer)
×
63
      }
64
    }
65
    Await.result(consumerAfterWork, Duration.apply(timeoutMillis, TimeUnit.MILLISECONDS))
×
66
  }
67

68
  def toProducerProperties(config: KafkaConfig, clientId: String): Properties = {
69
    val props: Properties = new Properties
8✔
70
    props.put("key.serializer", classOf[ByteArraySerializer])
8✔
71
    props.put("value.serializer", classOf[ByteArraySerializer])
8✔
72
    setClientId(props, clientId)
8✔
73
    withPropertiesFromConfig(props, config)
8✔
74
  }
75

76
  private def withPropertiesFromConfig(defaults: Properties, kafkaConfig: KafkaConfig): Properties = {
77
    val props = new Properties()
10✔
78
    defaults.forEach((k, v) => props.put(k, v))
8✔
79
    kafkaConfig.kafkaAddress.foreach { kafkaAddress =>
10✔
80
      props.put("bootstrap.servers", kafkaAddress)
×
81
    }
82
    kafkaConfig.kafkaProperties.getOrElse(Map.empty).foreach { case (k, v) =>
×
83
      props.put(k, v)
10✔
84
    }
85
    props
86
  }
87

88

89
  def toConsumerProperties(config: KafkaConfig, groupId: Option[String]): Properties = {
90
    val props = new Properties()
10✔
91
    props.put("value.deserializer", classOf[ByteArrayDeserializer])
10✔
92
    props.put("key.deserializer", classOf[ByteArrayDeserializer])
10✔
93
    groupId.foreach(props.setProperty("group.id", _))
10✔
94
    withPropertiesFromConfig(props, config)
10✔
95
  }
96

97
  def readLastMessages(topic: String, size: Int, config: KafkaConfig) : List[ConsumerRecord[Array[Byte], Array[Byte]]] = {
98
    doWithTempKafkaConsumer(config, None) { consumer =>
6✔
99
      try {
100
        consumer.partitionsFor(topic).asScala.map(no => new TopicPartition(topic, no.partition())).view.flatMap { tp =>
6✔
101
          val partitions = Collections.singletonList(tp)
6✔
102
          consumer.assign(partitions)
6✔
103
          consumer.seekToEnd(partitions)
6✔
104
          val lastOffset = consumer.position(tp)
6✔
105
          val offsetToSearch = Math.max(0, lastOffset - size)
6✔
106
          consumer.seek(tp, offsetToSearch)
6✔
107
          val result = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]](size)
6✔
108
          result.appendAll(consumer.poll(java.time.Duration.ofMillis(100)).records(tp).asScala)
6✔
109
          // result might be empty if we shift offset to far and there will be
110
          // no messages on the topic due to retention
111
          if(result.isEmpty){
6✔
112
            consumer.seekToBeginning(partitions)
×
113
          }
114
          var currentOffset = consumer.position(tp)
6✔
115
          // Trying to poll records until desired size OR till the end of the topic.
116
          // So when trying to read 70 msgs from topic with only 50, we will return 50 immediately
117
          // instead of waiting for another 20 to be written to the topic.
118
          while(result.size < size && currentOffset < lastOffset) {
6✔
119
            result.appendAll(consumer.poll(java.time.Duration.ofMillis(100)).records(tp).asScala)
×
120
            currentOffset = consumer.position(tp)
×
121
          }
122
          consumer.unsubscribe()
6✔
123
          result.take(size)
6✔
124
        }.take(size).toList
6✔
125
      } finally {
126
        consumer.unsubscribe()
6✔
127
      }
128
    }
129
  }
130

131
  private def doWithTempKafkaConsumer[T](config: KafkaConfig, groupId: Option[String])(fun: KafkaConsumer[Array[Byte], Array[Byte]] => T) = {
132
    // there has to be Kafka's classloader
133
    // http://stackoverflow.com/questions/40037857/intermittent-exception-in-tests-using-the-java-kafka-client
134
    ThreadUtils.withThisAsContextClassLoader(classOf[KafkaClient].getClassLoader) {
6✔
135
      val properties = KafkaUtils.toConsumerProperties(config, groupId)
6✔
136
      // default is read uncommitted which is not a good default
137
      setIsolationLevelIfAbsent(properties, IsolationLevel.READ_COMMITTED)
6✔
138
      properties.setProperty("session.timeout.ms", readTimeoutForTempConsumer(config).toString)
6✔
139
      val consumer: KafkaConsumer[Array[Byte], Array[Byte]] = new KafkaConsumer(properties)
6✔
140
      Using.resource(consumer)(fun)
6✔
141
    }
142
  }
143

144
  def setIsolationLevelIfAbsent(consumerProperties: Properties, isolationLevel: IsolationLevel): Unit = {
145
    consumerProperties.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel.toString.toLowerCase)
6✔
146
  }
147

148
  private def readTimeoutForTempConsumer(config: KafkaConfig): Long = config.kafkaProperties.flatMap(_.get("session.timeout.ms").map(_.toLong)).getOrElse(defaultTimeoutMillis)
×
149

150
  private def setOffsetToLatest(topic: String, consumer: KafkaConsumer[_, _]): Unit = {
151
    val partitions = consumer.partitionsFor(topic).asScala.map { partition => new TopicPartition(partition.topic(), partition.partition()) }
×
152
    consumer.assign(partitions.asJava)
×
153
    consumer.seekToEnd(partitions.asJava)
×
154
    partitions.foreach(p => consumer.position(p)) //`seekToEnd` is lazy, we have to invoke `position` to change offset
×
155
    consumer.commitSync()
×
156
  }
157

158
  def sendToKafkaWithTempProducer(topic: String, key: Array[Byte], value: Array[Byte])(kafkaProducerCreator: KafkaProducerCreator[Array[Byte], Array[Byte]]): Future[RecordMetadata] = {
159
    sendToKafkaWithTempProducer(new ProducerRecord(topic, key, value))(kafkaProducerCreator)
×
160
  }
161

162
  def sendToKafkaWithTempProducer(record: ProducerRecord[Array[Byte], Array[Byte]])(kafkaProducerCreator: KafkaProducerCreator[Array[Byte], Array[Byte]]): Future[RecordMetadata] = {
163
    //returned future is completed, as this method flushes producer cache
164
    Using.resource(kafkaProducerCreator.createProducer("temp-"+record.topic())) { producer =>
2✔
165
      sendToKafka(record)(producer)
2✔
166
    }
167
  }
168

169
  def sendToKafka[K, V](topic: String, key: K, value: V)(producer: Producer[K, V]): Future[RecordMetadata] = {
170
    sendToKafka(new ProducerRecord(topic, key, value))(producer)
×
171
  }
172

173
  def sendToKafka[K, V](record: ProducerRecord[K, V])(producer: Producer[K, V]): Future[RecordMetadata] = {
174
    val promise = Promise[RecordMetadata]()
4✔
175
    producer.send(record, KafkaUtils.producerCallback(promise))
4✔
176
    promise.future
4✔
177
  }
178

179
  def producerCallback(promise: Promise[RecordMetadata]): Callback =
180
    new Callback {
4✔
181
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
182
        val result = if (exception == null) Success(metadata) else Failure(exception)
2✔
183
        promise.complete(result)
2✔
184
      }
185
    }
186

187
  // It can't be in AzureUtils because it must be accessible from Lite Runtime
188
  val azureEventHubsUrl = ".servicebus.windows.net"
8✔
189

190
}
191

192
case class PreparedKafkaTopic(original: String, prepared: String)
193

194

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc