• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TouK / nussknacker / 5976637142

25 Aug 2023 01:43PM UTC coverage: 81.47% (+0.03%) from 81.438%
5976637142

push

github

Filemon279
Fix migration in 1.11

25 of 25 new or added lines in 2 files covered. (100.0%)

14865 of 18246 relevant lines covered (81.47%)

5.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.78
/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/richflink.scala
1
package pl.touk.nussknacker.engine.flink.util
2

3
import org.apache.flink.api.common.typeinfo.TypeInformation
4
import org.apache.flink.streaming.api.datastream.{DataStream, KeyedStream, SingleOutputStreamOperator}
5
import pl.touk.nussknacker.engine.api.{Context, LazyParameter, ValueWithContext}
6
import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport
7
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext
8
import pl.touk.nussknacker.engine.flink.util.keyed.{StringKeyOnlyMapper, StringKeyedValueMapper}
9
import pl.touk.nussknacker.engine.util.KeyedValue
10

11
import scala.reflect.runtime.universe.TypeTag
12

13
object richflink {
14

15
  implicit class FlinkKeyOperations(dataStream: DataStream[Context]) {
16

17
    def groupBy(groupBy: LazyParameter[CharSequence])(implicit ctx: FlinkCustomNodeContext): KeyedStream[ValueWithContext[String], String] = {
18
      val typeInfo = ctx.valueWithContextInfo.forType(TypeInformation.of(classOf[String]))
4✔
19
      dataStream
20
        .flatMap(new StringKeyOnlyMapper(ctx.lazyParameterHelper, groupBy), typeInfo)
4✔
21
        .keyBy((k: ValueWithContext[String]) => k.value)
4✔
22
    }
23

24
    def groupByWithValue[T <: AnyRef: TypeTag](groupBy: LazyParameter[CharSequence], value: LazyParameter[T])(implicit ctx: FlinkCustomNodeContext): KeyedStream[ValueWithContext[KeyedValue[String, T]], String] = {
25
      val typeInfo = keyed.typeInfo(ctx, groupBy.map[String]((k: CharSequence) => k.toString), value)
×
26
      dataStream
27
        .flatMap(new StringKeyedValueMapper(ctx.lazyParameterHelper, groupBy, value), typeInfo)
2✔
28
        .keyBy((k: ValueWithContext[KeyedValue[String, T]]) => k.value.key)
2✔
29
    }
30
  }
31

32
  implicit class ExplicitUid[T](dataStream: DataStream[T]) {
33

34
    //we set operator name to nodeId in custom transformers, so that some internal Flink metrics (e.g. RocksDB) are
35
    //reported with operator_name tag equal to nodeId.
36
    //in most cases uid should be set together with operator name, if this is not the case - use ExplicitUidInOperatorsSupport explicitly
37
    def setUidWithName(implicit ctx: FlinkCustomNodeContext, explicitUidInStatefulOperators: FlinkCustomNodeContext => Boolean): DataStream[T] =
38
      ExplicitUidInOperatorsSupport.setUidIfNeed[T](explicitUidInStatefulOperators(ctx), ctx.nodeId)(dataStream) match {
2✔
39
        case operator: SingleOutputStreamOperator[T] => operator.name(ctx.nodeId)
2✔
40
        case other => other
×
41
      }
42
  }
43

44
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc