• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snowplow / enrich / 8201322218

08 Mar 2024 09:26AM UTC coverage: 74.33%. First build
8201322218

Pull #872

github

benjben
Add incomplete events (close #)

Before this change, any error in the enriching workflow would short
circuit and a bad row would be emitted. After this change, if incomplete events
are enabled, the enriching goes to the end with what is possible,
accumulating errors as it goes. Errors get attached in derived_contexts.

There are now 3 main steps :

- Mapping and validating the input. This includes mapping fields of
  payload_data to the atomic event (e.g. tr_tt to tr_total while converting
  from string to number) and validating the contexts and unstruct event.
  Everything that goes wrong gets wrapped up in a SchemaViolations bad row.

- Running the enrichments. Everything that goes wrong gets wrapped up in an
  EnrichmentFailures bad row.

- Validating the output. This includes validating the enrichments contexts
  and the atomic fields lengths. Everything that goes wrong gets wrapped up
  in a SchemaViolations EnrichmentFailures bad row.
Pull Request #872: Add incomplete events

78 of 89 new or added lines in 12 files covered. (87.64%)

4048 of 5446 relevant lines covered (74.33%)

1.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.95
/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala
1
/*
2
 * Copyright (c) 2020-present Snowplow Analytics Ltd.
3
 * All rights reserved.
4
 *
5
 * This software is made available by Snowplow Analytics, Ltd.,
6
 * under the terms of the Snowplow Limited Use License Agreement, Version 1.0
7
 * located at https://docs.snowplow.io/limited-use-license-1.0
8
 * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
9
 * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
10
 */
11
package com.snowplowanalytics.snowplow.enrich.common.fs2
12

13
import java.nio.charset.StandardCharsets.UTF_8
14
import java.time.Instant
15
import java.util.Base64
16

17
import org.joda.time.DateTime
18

19
import cats.data.{Ior, NonEmptyList, ValidatedNel}
20
import cats.{Monad, Parallel}
21
import cats.implicits._
22

23
import cats.effect.kernel.{Async, Clock, Sync}
24
import cats.effect.implicits._
25

26
import fs2.{Pipe, Stream}
27

28
import _root_.io.sentry.SentryClient
29

30
import _root_.io.circe.syntax._
31

32
import org.typelevel.log4cats.Logger
33
import org.typelevel.log4cats.slf4j.Slf4jLogger
34

35
import com.snowplowanalytics.iglu.client.IgluCirceClient
36
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
37

38
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Processor, Payload => BadRowPayload}
39

40
import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline
41
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
42
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
43
import com.snowplowanalytics.snowplow.enrich.common.loaders.{CollectorPayload, ThriftLoader}
44
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentRegistry}
45
import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils
46

47
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.FeatureFlags
48

49
object Enrich {
50

51
  private implicit def unsafeLogger[F[_]: Sync]: Logger[F] =
52
    Slf4jLogger.getLogger[F]
1✔
53

54
  /**
55
   * Run a primary enrichment stream, reading from [[Environment]] source, enriching
56
   * via [[enrichWith]] and sinking into the Good, Bad, and Pii sinks.
57
   *
58
   * The stream won't download any enrichment DBs, it is responsibility of [[Assets]]
59
   * [[Assets.State.make]] downloads assets for the first time unconditionally during
60
   * [[Environment]] initialisation, then if `assetsUpdatePeriod` has been specified -
61
   * they'll be refreshed periodically by [[Assets.updateStream]]
62
   */
63
  def run[F[_]: Async, A](env: Environment[F, A]): Stream[F, Unit] = {
64
    val enrichmentsRegistry: F[EnrichmentRegistry[F]] = env.enrichments.get.map(_.registry)
1✔
65
    val enrich: Enrich[F] =
66
      enrichWith[F](
1✔
67
        enrichmentsRegistry,
68
        env.adapterRegistry,
69
        env.igluClient,
70
        env.sentry,
71
        env.processor,
72
        env.featureFlags,
73
        env.metrics.invalidCount,
74
        env.registryLookup,
75
        env.atomicFields,
76
        env.sinkIncomplete.isDefined
77
      )
78

79
    val enriched =
80
      env.source.chunks
81
        .evalTap(chunk => Logger[F].debug(s"Starting to process chunk of size ${chunk.size}"))
×
82
        .evalTap(chunk => env.metrics.rawCount(chunk.size))
1✔
83
        .map(chunk => chunk.map(a => (a, env.getPayload(a))))
1✔
84
        .evalMap(chunk =>
1✔
85
          for {
86
            begin <- Clock[F].realTime
1✔
87
            result <-
1✔
88
              env.semaphore.permit.use { _ =>
1✔
89
                chunk.toList.map { case (orig, bytes) => enrich(bytes).map((orig, _)) }.parSequenceN(env.streamsSettings.concurrency.enrich)
90
              }
91
            end <- Clock[F].realTime
1✔
92
            _ <- Logger[F].debug(s"Chunk of size ${chunk.size} enriched in ${end - begin} ms")
×
93
          } yield result
94
        )
95

96
    val sinkAndCheckpoint: Pipe[F, List[(A, Result)], Unit] =
97
      _.parEvalMap(env.streamsSettings.concurrency.sink)(chunk =>
1✔
98
        for {
99
          begin <- Clock[F].realTime
1✔
100
          result <- sinkChunk(chunk.map(_._2), env).as(chunk.map(_._1))
1✔
101
          end <- Clock[F].realTime
1✔
102
          _ <- Logger[F].debug(s"Chunk of size ${chunk.size} sunk in ${end - begin}")
×
103
        } yield result
104
      )
105
        .evalMap(env.checkpoint)
1✔
106

107
    enriched.through(CleanCancellation(sinkAndCheckpoint))
1✔
108
  }
109

110
  /**
111
   * Enrich a single `CollectorPayload` to get list of bad rows and/or enriched events
112
   * @return enriched event or bad row, along with the collector timestamp
113
   */
114
  def enrichWith[F[_]: Sync](
115
    enrichRegistry: F[EnrichmentRegistry[F]],
116
    adapterRegistry: AdapterRegistry[F],
117
    igluClient: IgluCirceClient[F],
118
    sentry: Option[SentryClient],
119
    processor: Processor,
120
    featureFlags: FeatureFlags,
121
    invalidCount: F[Unit],
122
    registryLookup: RegistryLookup[F],
123
    atomicFields: AtomicFields,
124
    emitIncomplete: Boolean
125
  )(
126
    row: Array[Byte]
127
  ): F[Result] = {
128
    val payload = ThriftLoader.toCollectorPayload(row, processor, featureFlags.tryBase64Decoding)
1✔
129
    val collectorTstamp = payload.toOption.flatMap(_.flatMap(_.context.timestamp).map(_.getMillis))
1✔
130

131
    val result =
132
      for {
133
        etlTstamp <- Clock[F].realTime.map(time => new DateTime(time.toMillis))
1✔
134
        registry <- enrichRegistry
1✔
135
        enriched <- EtlPipeline.processEvents[F](
1✔
136
                      adapterRegistry,
137
                      registry,
138
                      igluClient,
139
                      processor,
140
                      etlTstamp,
141
                      payload,
142
                      FeatureFlags.toCommon(featureFlags),
143
                      invalidCount,
144
                      registryLookup,
145
                      atomicFields,
146
                      emitIncomplete
147
                    )
148
      } yield (enriched, collectorTstamp)
1✔
149

150
    result.handleErrorWith(sendToSentry[F](row, sentry, processor, collectorTstamp))
1✔
151
  }
152

153
  /** Stringify `ThriftLoader` result for debugging purposes */
154
  def payloadToString(payload: ValidatedNel[BadRow.CPFormatViolation, Option[CollectorPayload]]): String =
155
    payload.fold(_.asJson.noSpaces, _.map(_.toBadRowPayload.asJson.noSpaces).getOrElse("None"))
×
156

157
  /** Log an error, turn the problematic `CollectorPayload` into `BadRow` and notify Sentry if configured */
158
  def sendToSentry[F[_]: Sync: Clock](
159
    original: Array[Byte],
160
    sentry: Option[SentryClient],
161
    processor: Processor,
162
    collectorTstamp: Option[Long]
163
  )(
164
    error: Throwable
165
  ): F[Result] =
166
    for {
167
      _ <- Logger[F].error("Runtime exception during payload enrichment. CollectorPayload converted to generic_error and ack'ed")
×
168
      now <- Clock[F].realTimeInstant
×
169
      badRow = genericBadRow(original, now, error, processor)
×
170
      _ <- sentry match {
×
171
             case Some(client) =>
172
               Sync[F].delay(client.sendException(error))
×
173
             case None =>
174
               Sync[F].unit
×
175
           }
NEW
176
    } yield (List(Ior.left(badRow)), collectorTstamp)
×
177

178
  /** Build a `generic_error` bad row for unhandled runtime errors */
179
  def genericBadRow(
180
    row: Array[Byte],
181
    time: Instant,
182
    error: Throwable,
183
    processor: Processor
184
  ): BadRow.GenericError = {
185
    val base64 = new String(Base64.getEncoder.encode(row))
×
186
    val rawPayload = BadRowPayload.RawPayload(base64)
×
187
    val failure = Failure.GenericFailure(time, NonEmptyList.one(ConversionUtils.cleanStackTrace(error)))
×
188
    BadRow.GenericError(processor, failure, rawPayload)
×
189
  }
190

191
  def sinkChunk[F[_]: Parallel: Sync, A](
192
    chunk: List[Result],
193
    env: Environment[F, A]
194
  ): F[Unit] = {
195
    val (bad, enriched, incomplete) =
1✔
196
      chunk
197
        .flatMap(_._1)
198
        .foldLeft((List.empty[BadRow], List.empty[EnrichedEvent], List.empty[EnrichedEvent])) {
199
          case (previous, item) =>
200
            val (bad, enriched, incomplete) = previous
201
            item match {
202
              case Ior.Right(e) => (bad, e :: enriched, incomplete)
203
              case Ior.Left(br) => (br :: bad, enriched, incomplete)
204
              case Ior.Both(br, i) => (br :: bad, enriched, i :: incomplete)
205
            }
206
        }
207

208
    val (moreBad, good) = enriched.map { e =>
1✔
209
      serializeEnriched(e, env.processor, env.streamsSettings.maxRecordSize)
210
        .map(bytes => (e, AttributedData(bytes, env.goodPartitionKey(e), env.goodAttributes(e))))
211
    }.separate
212

213
    val (incompleteTooBig, incompleteBytes) = incomplete.map { e =>
1✔
214
      serializeEnriched(e, env.processor, env.streamsSettings.maxRecordSize)
215
        .map(bytes => AttributedData(bytes, env.goodPartitionKey(e), env.goodAttributes(e)))
216
    }.separate
217

218
    val allBad = (bad ++ moreBad).map(badRowResize(env, _))
1✔
219

220
    List(
1✔
221
      sinkGood(
222
        good,
223
        env.sinkGood,
224
        env.metrics.goodCount,
225
        env.metadata.observe,
226
        env.sinkPii,
227
        env.piiPartitionKey,
228
        env.piiAttributes,
229
        env.processor,
230
        env.streamsSettings.maxRecordSize
231
      ) *> env.metrics.enrichLatency(chunk.headOption.flatMap(_._2)),
1✔
232
      sinkBad(allBad, env.sinkBad, env.metrics.badCount),
1✔
NEW
233
      if (incompleteTooBig.nonEmpty) Logger[F].warn(s"${incompleteTooBig.size} incomplete events discarded because they are too big")
×
234
      else Sync[F].unit,
1✔
235
      sinkIncomplete(incompleteBytes, env.sinkIncomplete)
1✔
236
    ).parSequence_
1✔
237
  }
238

239
  def sinkGood[F[_]: Sync](
240
    good: List[(EnrichedEvent, AttributedData[Array[Byte]])],
241
    sink: AttributedByteSink[F],
242
    incMetrics: Int => F[Unit],
243
    metadata: List[EnrichedEvent] => F[Unit],
244
    piiSink: Option[AttributedByteSink[F]],
245
    piiPartitionKey: EnrichedEvent => String,
246
    piiAttributes: EnrichedEvent => Map[String, String],
247
    processor: Processor,
248
    maxRecordSize: Int
249
  ): F[Unit] = {
250
    val enriched = good.map(_._1)
1✔
251
    val serialized = good.map(_._2)
1✔
252
    sink(serialized) *> incMetrics(good.size) *> metadata(enriched) *> sinkPii(enriched,
1✔
253
                                                                               piiSink,
254
                                                                               piiPartitionKey,
255
                                                                               piiAttributes,
256
                                                                               processor,
257
                                                                               maxRecordSize
258
    )
259
  }
260

261
  def sinkBad[F[_]: Monad](
262
    bad: List[Array[Byte]],
263
    sink: ByteSink[F],
264
    incMetrics: Int => F[Unit]
265
  ): F[Unit] =
266
    sink(bad) *> incMetrics(bad.size)
1✔
267

268
  def sinkPii[F[_]: Sync](
269
    enriched: List[EnrichedEvent],
270
    maybeSink: Option[AttributedByteSink[F]],
271
    partitionKey: EnrichedEvent => String,
272
    attributes: EnrichedEvent => Map[String, String],
273
    processor: Processor,
274
    maxRecordSize: Int
275
  ): F[Unit] =
276
    maybeSink match {
277
      case Some(sink) =>
278
        val (bad, serialized) =
1✔
279
          enriched
280
            .flatMap(ConversionUtils.getPiiEvent(processor, _))
281
            .map(e => serializeEnriched(e, processor, maxRecordSize).map(AttributedData(_, partitionKey(e), attributes(e))))
282
            .separate
283
        val logging =
284
          if (bad.nonEmpty)
1✔
285
            Logger[F].error(s"${bad.size} PII events couldn't get sent because they are too big")
1✔
286
          else
287
            Sync[F].unit
1✔
288
        sink(serialized) *> logging
1✔
289
      case None =>
290
        Sync[F].unit
×
291
    }
292

293
  def sinkIncomplete[F[_]: Sync](
294
    incomplete: List[AttributedData[Array[Byte]]],
295
    maybeSink: Option[AttributedByteSink[F]]
296
  ): F[Unit] =
297
    maybeSink match {
298
      case Some(sink) => sink(incomplete)
1✔
NEW
299
      case None => Sync[F].unit
×
300
    }
301

302
  def serializeEnriched(
303
    enriched: EnrichedEvent,
304
    processor: Processor,
305
    maxRecordSize: Int
306
  ): Either[BadRow, Array[Byte]] = {
307
    val asStr = ConversionUtils.tabSeparatedEnrichedEvent(enriched)
1✔
308
    val asBytes = asStr.getBytes(UTF_8)
1✔
309
    val size = asBytes.length
1✔
310
    if (size > maxRecordSize) {
1✔
311
      val msg = s"event passed enrichment but then exceeded the maximum allowed size $maxRecordSize bytes"
1✔
312
      val br = BadRow
313
        .SizeViolation(
1✔
314
          processor,
315
          Failure.SizeViolation(Instant.now(), maxRecordSize, size, msg),
1✔
316
          BadRowPayload.RawPayload(asStr.take(maxRecordSize * 8 / 10))
1✔
317
        )
318
      Left(br)
1✔
319
    } else Right(asBytes)
1✔
320
  }
321

322
  /**
323
   * Check if plain bad row (such as `enrichment_failure`) exceeds the `MaxRecordSize`
324
   * If it does - turn into size violation with trimmed
325
   */
326
  def badRowResize[F[_], A](env: Environment[F, A], badRow: BadRow): Array[Byte] = {
327
    val asStr = badRow.compact
1✔
328
    val originalBytes = asStr.getBytes(UTF_8)
1✔
329
    val size = originalBytes.size
1✔
330
    val maxRecordSize = env.streamsSettings.maxRecordSize
1✔
331
    if (size > maxRecordSize) {
1✔
332
      val msg = s"event failed enrichment, but resulting bad row exceeds allowed size $maxRecordSize"
×
333
      BadRow
334
        .SizeViolation(
335
          env.processor,
×
336
          Failure.SizeViolation(Instant.now(), maxRecordSize, size, msg),
×
337
          BadRowPayload.RawPayload(asStr.take(maxRecordSize / 10))
×
338
        )
339
        .compact
340
        .getBytes(UTF_8)
×
341
    } else originalBytes
1✔
342
  }
343
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc