• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snowplow / enrich / 8201322218

08 Mar 2024 09:26AM UTC coverage: 74.33%. First build
8201322218

Pull #872

github

benjben
Add incomplete events (close #)

Before this change, any error in the enriching workflow would short
circuit and a bad row would be emitted. After this change, if incomplete events
are enabled, the enriching goes to the end with what is possible,
accumulating errors as it goes. Errors get attached in derived_contexts.

There are now 3 main steps :

- Mapping and validating the input. This includes mapping fields of
  payload_data to the atomic event (e.g. tr_tt to tr_total while converting
  from string to number) and validating the contexts and unstruct event.
  Everything that goes wrong gets wrapped up in a SchemaViolations bad row.

- Running the enrichments. Everything that goes wrong gets wrapped up in an
  EnrichmentFailures bad row.

- Validating the output. This includes validating the enrichments contexts
  and the atomic fields lengths. Everything that goes wrong gets wrapped up
  in a SchemaViolations EnrichmentFailures bad row.
Pull Request #872: Add incomplete events

78 of 89 new or added lines in 12 files covered. (87.64%)

4048 of 5446 relevant lines covered (74.33%)

1.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.33
/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/EtlPipeline.scala
1
/*
2
 * Copyright (c) 2012-present Snowplow Analytics Ltd.
3
 * All rights reserved.
4
 *
5
 * This software is made available by Snowplow Analytics, Ltd.,
6
 * under the terms of the Snowplow Limited Use License Agreement, Version 1.0
7
 * located at https://docs.snowplow.io/limited-use-license-1.0
8
 * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
9
 * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
10
 */
11
package com.snowplowanalytics.snowplow.enrich.common
12

13
import cats.data.{Ior, Validated, ValidatedNel}
14
import cats.effect.kernel.Sync
15
import cats.implicits._
16

17
import org.joda.time.DateTime
18

19
import com.snowplowanalytics.iglu.client.IgluCirceClient
20
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
21

22
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor}
23

24
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
25
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentManager, EnrichmentRegistry}
26
import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
27
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
28

29
/** Expresses the end-to-end event pipeline supported by the Scala Common Enrich project. */
30
object EtlPipeline {
31

32
  /*
33
   * Feature flags available in the current version of Enrich
34
   * @param acceptInvalid Whether enriched events that are invalid against
35
   *                      atomic schema should be emitted as enriched events.
36
   *                      If not they will be emitted as bad rows
37
   * @param legacyEnrichmentOrder Whether to use the incorrect enrichment order which was historically
38
   *                      used in early versions of enrich-pubsub and enrich-kinesis.
39
   */
40
  case class FeatureFlags(acceptInvalid: Boolean, legacyEnrichmentOrder: Boolean)
41

42
  /**
43
   * @param adapterRegistry Contains all of the events adapters
44
   * @param enrichmentRegistry Contains configuration for all enrichments to apply
45
   * @param client Our Iglu client, for schema lookups and validation
46
   * @param processor The ETL application (Spark/Beam/Stream enrich) and its version
47
   * @param etlTstamp The ETL timestamp
48
   * @param input The ValidatedMaybeCanonicalInput
49
   * @param featureFlags The feature flags available in the current version of Enrich
50
   * @param invalidCount Function to increment the count of invalid events
51
   */
52
  def processEvents[F[_]: Sync](
53
    adapterRegistry: AdapterRegistry[F],
54
    enrichmentRegistry: EnrichmentRegistry[F],
55
    client: IgluCirceClient[F],
56
    processor: Processor,
57
    etlTstamp: DateTime,
58
    input: ValidatedNel[BadRow, Option[CollectorPayload]],
59
    featureFlags: FeatureFlags,
60
    invalidCount: F[Unit],
61
    registryLookup: RegistryLookup[F],
62
    atomicFields: AtomicFields,
63
    emitIncomplete: Boolean
64
  ): F[List[Ior[BadRow, EnrichedEvent]]] =
65
    input match {
66
      case Validated.Valid(Some(payload)) =>
67
        adapterRegistry
68
          .toRawEvents(payload, client, processor, registryLookup)
2✔
69
          .flatMap {
2✔
70
            case Validated.Valid(rawEvents) =>
71
              rawEvents.toList.traverse { event =>
1✔
72
                EnrichmentManager
73
                  .enrichEvent(
74
                    enrichmentRegistry,
75
                    client,
76
                    processor,
77
                    etlTstamp,
78
                    event,
79
                    featureFlags,
80
                    invalidCount,
81
                    registryLookup,
82
                    atomicFields,
83
                    emitIncomplete
84
                  )
85
                  .value
86
              }
87
            case Validated.Invalid(badRow) =>
NEW
88
              Sync[F].pure(List(Ior.left(badRow)))
×
89
          }
90
      case Validated.Invalid(badRows) =>
91
        Sync[F].pure(badRows.toList.map(br => Ior.left(br)))
2✔
92
      case Validated.Valid(None) =>
93
        Sync[F].pure(Nil)
1✔
94
    }
95
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc