• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ByteOps-swe / MVP / 8452709948

27 Mar 2024 01:35PM UTC coverage: 69.685% (-13.9%) from 83.559%
8452709948

push

github

Barutta02
modifica schema

1 of 16 new or added lines in 6 files covered. (6.25%)

350 existing lines in 27 files now uncovered.

1547 of 2220 relevant lines covered (69.68%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/FaustProcessing/processing.py
1
import faust
×
2
from confluent_kafka.schema_registry import SchemaRegistryClient
×
3

4
from HealthStateModel.health_calculator import health_calculator
×
5
from HealthStateModel.health_calculator_thread import health_calculator_thread
×
6
from HealthStateModel.Writers.composite_writer import composite_writer
×
7
from ProcessingAdapter.faust_measurement import faust_measurement
×
8
from ProcessingAdapter.health_model_processor_adapter import health_model_processor_adapter
×
9

10
schema_registry_url ="http://schema_registry:8081"
×
11
schema_name ="misurazione"
×
NEW
12
healthWriter = composite_writer().add_kafka_confluent_writer("HealthScore", "kafka", "9092",schema_registry_url).add_std_out_writer()
×
13
health_calculator = health_calculator()
×
14
healthThread  = health_calculator_thread(health_calculator,healthWriter,5)
×
15

16
temperature_topic = "temperature"
×
17
humidity_topic = "humidity"
×
18
dustPm10_topic = "dust_PM10"
×
19

20
schema_registry = SchemaRegistryClient({'url': schema_registry_url})
×
21
app = faust.App('myapp', broker='kafka://kafka:9092')
×
22

23
# Register the Avro serializer with the Schema Registry
24
app.conf.serializer = 'faust.AvroSerializer'
×
25
app.conf.kafka_key_serializer = 'faust.AvroSerializer'
×
26
app.conf.kafka_value_serializer = 'faust.AvroSerializer'
×
27
app.conf.schema_registry = schema_registry
×
28
app.conf.schema_registry_url = 'http://schema_registry:8081'
×
29

30
topic = app.topic(temperature_topic,humidity_topic,dustPm10_topic, value_type=faust_measurement)
×
31

32
measurement_processor = health_model_processor_adapter(health_calculator)
×
33

34
@app.agent(topic)
×
35
async def process(measurements):
×
36
    try:
×
37
        async for measurement in measurements:
×
38
            await measurement_processor.process(measurement)
×
39
    except Exception as e:
×
40
        print(f"Errore durante il processamento delle misurazioni: {e}")
×
41

42
@app.task()
×
43
async def mytask():
×
44
    healthThread.start()
×
45

46
app.main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc