• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ByteOps-swe / MVP / 8215196951

09 Mar 2024 02:27PM UTC coverage: 9.381% (-61.8%) from 71.198%
8215196951

push

github

Shinji76
Merge branch 'develop' into Action-test

47 of 59 new or added lines in 6 files covered. (79.66%)

956 existing lines in 41 files now uncovered.

144 of 1535 relevant lines covered (9.38%)

0.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/PythonSensorsSimulator/Test/Integration_Test/test_HealthModelIntegration.py
1
# pylint: skip-file
UNCOV
2
import os
×
UNCOV
3
from datetime import datetime
×
UNCOV
4
import asyncio
×
UNCOV
5
import pytest
×
UNCOV
6
import clickhouse_connect
×
7

UNCOV
8
from ...Model.Simulators.Coordinate import Coordinate
×
UNCOV
9
from ...Model.Simulators.Misurazione import Misurazione
×
UNCOV
10
from ...Model.Writers.KafkaWriter import KafkaWriter
×
UNCOV
11
from ...Model.Writers.kafkaAdapter.KafkaConfluentAdapter import KafkaConfluentAdapter
×
UNCOV
12
from ...Model.AdapterMisurazione import AdapterMisurazione
×
UNCOV
13
from ...Model.Simulators.SensorTypes import SensorTypes
×
14

UNCOV
15
KAFKA_HOST = os.environ.get("KAFKA_HOST", "kafka")
×
UNCOV
16
KAFKA_PORT = os.environ.get("KAFKA_PORT", "9092")
×
UNCOV
17
topic_tmp = "temperature"
×
UNCOV
18
topic_umd = "umidity"
×
UNCOV
19
table_to_test = "healthScore"
×
20

UNCOV
21
@pytest.fixture(scope='module')
×
UNCOV
22
def clickhouse_client():
×
UNCOV
23
    client = clickhouse_connect.get_client(host='clickhouse', port=8123, database="innovacity")
×
UNCOV
24
    yield client
×
UNCOV
25
    client.close()
×
26

UNCOV
27
@pytest.fixture(scope='module')
×
UNCOV
28
def kafka_writer_tmp():
×
UNCOV
29
    adapter_kafka_tmp = KafkaConfluentAdapter(topic_tmp, KAFKA_HOST, KAFKA_PORT)
×
UNCOV
30
    kafka_writer_tmp = KafkaWriter(adapter_kafka_tmp)
×
UNCOV
31
    yield kafka_writer_tmp
×
32

UNCOV
33
@pytest.fixture(scope='module')
×
UNCOV
34
def kafka_writer_umd():
×
UNCOV
35
    adapter_kafka_umd = KafkaConfluentAdapter(topic_umd, KAFKA_HOST, KAFKA_PORT)
×
UNCOV
36
    kafka_writer_umd = KafkaWriter(adapter_kafka_umd)
×
UNCOV
37
    yield kafka_writer_umd
×
38

UNCOV
39
@pytest.mark.asyncio
×
UNCOV
40
async def test_heatlh_score_integration(clickhouse_client,kafka_writer_tmp,kafka_writer_umd):
×
UNCOV
41
    try:
×
UNCOV
42
        timestamp = datetime.now()
×
43

UNCOV
44
        tmp_sensor_data = [
×
45
            {"id": "HS_1_tmp","cella":"HSTestCell","timestamp":timestamp,"value": 25,"longitude": 11.859271,"latitude": 45.39214,"type": SensorTypes.TEMPERATURE.value},
46
            {"id": "HS_1_tmp","cella":"HSTestCell","timestamp":timestamp,"value": 25,"longitude": 11.859271,"latitude": 45.39214,"type": SensorTypes.TEMPERATURE.value},
47
            {"id": "HS_2_tmp","cella":"HSTestCell","timestamp":timestamp,"value": 25,"longitude": 11.859271,"latitude": 45.39214,"type": SensorTypes.TEMPERATURE.value}
48
        ]
49

UNCOV
50
        umd_sensor_data = [
×
51
            {"id": "HS_1_umd","cella":"HSTestCell","timestamp":timestamp,"value": 50,"longitude": 11.859271,"latitude": 45.39214,"type": SensorTypes.HUMIDITY.value},
52
            {"id": "HS_1_umd","cella":"HSTestCell","timestamp":timestamp,"value": 50,"longitude": 11.859271,"latitude": 45.39214,"type": SensorTypes.HUMIDITY.value},
53
            {"id": "HS_2_umd","cella":"HSTestCell","timestamp":timestamp,"value": 50,"longitude": 11.859271,"latitude": 45.39214,"type": SensorTypes.HUMIDITY.value}
54
        ]
55

UNCOV
56
        for data in tmp_sensor_data:
×
UNCOV
57
            misurazione = AdapterMisurazione(
×
58
                Misurazione(data["timestamp"], data["value"], data["type"], Coordinate(data["latitude"],data["longitude"]), data["id"], data["cella"]))
UNCOV
59
            kafka_writer_tmp.write(misurazione)
×
60

UNCOV
61
        for data in umd_sensor_data:
×
UNCOV
62
            misurazione = AdapterMisurazione(
×
63
                Misurazione(data["timestamp"], data["value"], data["type"], Coordinate(data["latitude"],data["longitude"]), data["id"], data["cella"]))
UNCOV
64
            kafka_writer_umd.write(misurazione)
×
65

UNCOV
66
        kafka_writer_tmp.flush_kafka_producer()
×
UNCOV
67
        kafka_writer_umd.flush_kafka_producer()
×
68

UNCOV
69
        await asyncio.sleep(10)
×
70

UNCOV
71
        expected_result = 0
×
UNCOV
72
        result = clickhouse_client.query(f"SELECT * FROM innovacity.{table_to_test} order by timestamp desc")
×
UNCOV
73
        assert  float(result.result_rows[0][2]) == expected_result
×
74

UNCOV
75
    except Exception as e:
×
UNCOV
76
        pytest.fail(f"Failed to connect to ClickHouse database: {e}")
×
77

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc