• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snowplow / enrich / 8201322218

08 Mar 2024 09:26AM UTC coverage: 74.33%. First build
8201322218

Pull #872

github

benjben
Add incomplete events (close #)

Before this change, any error in the enriching workflow would short
circuit and a bad row would be emitted. After this change, if incomplete events
are enabled, the enriching goes to the end with what is possible,
accumulating errors as it goes. Errors get attached in derived_contexts.

There are now 3 main steps :

- Mapping and validating the input. This includes mapping fields of
  payload_data to the atomic event (e.g. tr_tt to tr_total while converting
  from string to number) and validating the contexts and unstruct event.
  Everything that goes wrong gets wrapped up in a SchemaViolations bad row.

- Running the enrichments. Everything that goes wrong gets wrapped up in an
  EnrichmentFailures bad row.

- Validating the output. This includes validating the enrichments contexts
  and the atomic fields lengths. Everything that goes wrong gets wrapped up
  in a SchemaViolations EnrichmentFailures bad row.
Pull Request #872: Add incomplete events

78 of 89 new or added lines in 12 files covered. (87.64%)

4048 of 5446 relevant lines covered (74.33%)

1.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/Main.scala
1
/*
2
 * Copyright (c) 2023-present Snowplow Analytics Ltd.
3
 * All rights reserved.
4
 *
5
 * This software is made available by Snowplow Analytics, Ltd.,
6
 * under the terms of the Snowplow Limited Use License Agreement, Version 1.0
7
 * located at https://docs.snowplow.io/limited-use-license-1.0
8
 * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
9
 * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
10
 */
11
package com.snowplowanalytics.snowplow.enrich.nsq
12

13
import scala.concurrent.duration._
14

15
import cats.Parallel
16
import cats.implicits._
17

18
import cats.effect.{ExitCode, IO, IOApp}
19
import cats.effect.kernel.{Resource, Sync}
20
import cats.effect.metrics.CpuStarvationWarningMetrics
21

22
import org.typelevel.log4cats.Logger
23
import org.typelevel.log4cats.slf4j.Slf4jLogger
24

25
import com.snowplowanalytics.snowplow.enrich.common.fs2.Run
26
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.BlobStorageClients
27
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client
28

29
import com.snowplowanalytics.snowplow.enrich.aws.S3Client
30

31
import com.snowplowanalytics.snowplow.enrich.gcp.GcsClient
32

33
import com.snowplowanalytics.snowplow.enrich.azure.AzureStorageClient
34

35
import com.snowplowanalytics.snowplow.enrich.nsq.generated.BuildInfo
36

37
object Main extends IOApp {
38

39
  override def runtimeConfig =
40
    super.runtimeConfig.copy(cpuStarvationCheckInterval = 10.seconds)
×
41

42
  private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
×
43

44
  override def onCpuStarvationWarn(metrics: CpuStarvationWarningMetrics): IO[Unit] =
45
    Logger[IO].debug(s"Cats Effect measured responsiveness in excess of ${metrics.starvationInterval * metrics.starvationThreshold}")
×
46

47
  // Nsq records must not exceed 1MB
48
  private val MaxRecordSize = 1000000
×
49

50
  def run(args: List[String]): IO[ExitCode] =
51
    Run.run[IO, Record[IO]](
×
52
      args,
53
      BuildInfo.name,
×
54
      """(\d.\d.\d(-\w*\d*)?)""".r.findFirstIn(BuildInfo.version).getOrElse(BuildInfo.version),
×
55
      BuildInfo.description,
×
56
      IO.pure,
×
57
      (input, _) => Source.init(input),
×
58
      out => Sink.initAttributed(out),
×
59
      out => Sink.initAttributed(out),
×
60
      out => Sink.init(out),
×
NEW
61
      out => Sink.initAttributed(out),
×
62
      checkpoint,
×
63
      createBlobStorageClient,
×
64
      _.data,
×
65
      MaxRecordSize,
×
66
      None,
×
67
      None
×
68
    )
69

70
  private def checkpoint[F[_]: Parallel: Sync](records: List[Record[F]]): F[Unit] =
71
    records.parTraverse_(_.ack)
×
72

73
  private def createBlobStorageClient(conf: BlobStorageClients): List[Resource[IO, Client[IO]]] = {
74
    val gcs = if (conf.gcs) Some(Resource.eval(GcsClient.mk[IO])) else None
×
75
    val aws = if (conf.s3) Some(S3Client.mk[IO]) else None
×
76
    val azure = conf.azureStorage.map(s => AzureStorageClient.mk[IO](s))
×
77
    List(gcs, aws, azure).flatten
×
78
  }
79
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc