• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ByteOps-swe / MVP / 8231589811

11 Mar 2024 10:53AM UTC coverage: 85.561% (+0.9%) from 84.669%
8231589811

push

github

Barutta02
correction test

27 of 27 new or added lines in 8 files covered. (100.0%)

5 existing lines in 3 files now uncovered.

1600 of 1870 relevant lines covered (85.56%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.31
/PythonSensorsSimulator/Test/Integration_Test/test_HealthModelIntegration.py
1
# pylint: skip-file
2
import os
1✔
3
from datetime import datetime
1✔
4
import asyncio
1✔
5
import pytest
1✔
6
import clickhouse_connect
1✔
7

8
from ...Model.Simulators.Coordinate import Coordinate
1✔
9
from ...Model.Simulators.Misurazione import Misurazione
1✔
10
from ...Model.Writers.KafkaWriter import KafkaWriter
1✔
11
from ...Model.Writers.kafkaAdapter.KafkaConfluentAdapter import KafkaConfluentAdapter
1✔
12
from ...Model.AdapterMisurazione import AdapterMisurazione
1✔
13
from ...Model.Simulators.SensorTypes import SensorTypes
1✔
14

15
KAFKA_HOST = os.environ.get("KAFKA_HOST", "kafka")
1✔
16
KAFKA_PORT = os.environ.get("KAFKA_PORT", "9092")
1✔
17
topic_tmp = "temperature"
1✔
18
topic_umd = "humidity"
1✔
19
table_to_test = "healthScore"
1✔
20

21
@pytest.fixture(scope='module')
1✔
22
def clickhouse_client():
1✔
23
    client = clickhouse_connect.get_client(host='clickhouse', port=8123, database="innovacity")
1✔
24
    yield client
1✔
25
    client.close()
1✔
26

27
@pytest.fixture(scope='module')
1✔
28
def kafka_writer_tmp():
1✔
29
    adapter_kafka_tmp = KafkaConfluentAdapter(topic_tmp, KAFKA_HOST, KAFKA_PORT)
1✔
30
    kafka_writer_tmp = KafkaWriter(adapter_kafka_tmp)
1✔
31
    yield kafka_writer_tmp
1✔
32

33
@pytest.fixture(scope='module')
1✔
34
def kafka_writer_umd():
1✔
35
    adapter_kafka_umd = KafkaConfluentAdapter(topic_umd, KAFKA_HOST, KAFKA_PORT)
1✔
36
    kafka_writer_umd = KafkaWriter(adapter_kafka_umd)
1✔
37
    yield kafka_writer_umd
1✔
38

39
@pytest.mark.asyncio
1✔
40
async def test_heatlh_score_integration(clickhouse_client, kafka_writer_tmp, kafka_writer_umd):
1✔
41
    try:
1✔
42
        timestamp = datetime.now()
1✔
43

44
        tmp_sensor_data = [
1✔
45
            {"id": "HS_1_tmp", "cella": "HSTestCell", "timestamp": timestamp, "value": 25, "longitude": 11.859271, "latitude": 45.39214, "type": SensorTypes.TEMPERATURE.value},
46
            {"id": "HS_2_tmp", "cella": "HSTestCell", "timestamp": timestamp, "value": 25, "longitude": 11.859271, "latitude": 45.39214, "type": SensorTypes.TEMPERATURE.value},
47
            {"id": "HS_3_tmp", "cella": "HSTestCell", "timestamp": timestamp, "value": 25, "longitude": 11.859271, "latitude": 45.39214, "type": SensorTypes.TEMPERATURE.value}
48
        ]
49

50
        umd_sensor_data = [
1✔
51
            {"id": "HS_1_umd", "cella": "HSTestCell", "timestamp": timestamp, "value": 50, "longitude": 11.859271, "latitude": 45.39214, "type": SensorTypes.HUMIDITY.value},
52
            {"id": "HS_2_umd", "cella": "HSTestCell", "timestamp": timestamp, "value": 50, "longitude": 11.859271, "latitude": 45.39214, "type": SensorTypes.HUMIDITY.value},
53
            {"id": "HS_3_umd", "cella": "HSTestCell", "timestamp": timestamp, "value": 50, "longitude": 11.859271, "latitude": 45.39214, "type": SensorTypes.HUMIDITY.value}
54
        ]
55

56
        for data in tmp_sensor_data:
1✔
57
            misurazione = AdapterMisurazione(
1✔
58
                Misurazione(data["timestamp"], data["value"], data["type"], Coordinate(data["latitude"],data["longitude"]), data["id"], data["cella"]))
59
            kafka_writer_tmp.write(misurazione)
1✔
60

61
        for data in umd_sensor_data:
1✔
62
            misurazione = AdapterMisurazione(
1✔
63
                Misurazione(data["timestamp"], data["value"], data["type"], Coordinate(data["latitude"],data["longitude"]), data["id"], data["cella"]))
64
            kafka_writer_umd.write(misurazione)
1✔
65

66
        kafka_writer_tmp.flush_kafka_producer()
1✔
67
        kafka_writer_umd.flush_kafka_producer()
1✔
68

69
        query = f"SELECT * FROM innovacity.{table_to_test} where timestamp >= '{timestamp}' order by timestamp desc"
1✔
70

71
        result = clickhouse_client.query(query)
1✔
72
        iter = 0
1✔
73
        max_seconds_to_wait = 15
1✔
74
        intervallo_sleep = 0.5
1✔
75
        while (not result.result_rows) and (iter * intervallo_sleep < max_seconds_to_wait):
1✔
76
            await asyncio.sleep(intervallo_sleep)
1✔
77
            result = clickhouse_client.query(query)
1✔
78
            iter += 1
1✔
79

80
        expected_result = 0
1✔
81
        assert result.result_rows
1✔
UNCOV
82
        assert  float(result.result_rows[0][2]) == expected_result
×
83

84
    except Exception as e:
1✔
85
        pytest.fail(f"Failed to connect to ClickHouse database: {e}")
1✔
86

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc