• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ByteOps-swe / MVP / 8229813241

11 Mar 2024 08:32AM UTC coverage: 79.004% (+0.2%) from 78.799%
8229813241

push

github

web-flow
Merge pull request #4 from ByteOps-swe/Action-test

Action test

129 of 159 new or added lines in 10 files covered. (81.13%)

1460 of 1848 relevant lines covered (79.0%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.63
/PythonSensorsSimulator/Test/System_Test/test_Clickhouse_electrical_table.py
1
# pylint: skip-file
2
import os
1✔
3
from datetime import datetime
1✔
4
import asyncio
1✔
5
import pytest
1✔
6
import clickhouse_connect
1✔
7

8
from ...Model.Simulators.Coordinate import Coordinate
1✔
9
from ...Model.Simulators.Misurazione import Misurazione
1✔
10
from ...Model.Writers.KafkaWriter import KafkaWriter
1✔
11
from ...Model.Writers.kafkaAdapter.KafkaConfluentAdapter import KafkaConfluentAdapter
1✔
12
from ...Model.AdapterMisurazione import AdapterMisurazione
1✔
13

14
KAFKA_HOST = os.environ.get("KAFKA_HOST", "kafka")
1✔
15
KAFKA_PORT = os.environ.get("KAFKA_PORT", "9092")
1✔
16
test_topic = "electricalFault"
1✔
17
table_to_test = "electricalFault"
1✔
18

19
@pytest.fixture(scope='module')
1✔
20
def clickhouse_client():
1✔
21
    client = clickhouse_connect.get_client(host='clickhouse', port=8123, database="innovacity")
1✔
22
    yield client
1✔
23
    client.close()
1✔
24

25
@pytest.fixture
1✔
26
def kafka_writer():
1✔
27
    adapter_kafka = KafkaConfluentAdapter(test_topic, KAFKA_HOST, KAFKA_PORT)
1✔
28
    kafka_writer = KafkaWriter(adapter_kafka)
1✔
29
    yield kafka_writer
1✔
30

31
@pytest.mark.asyncio
1✔
32
async def test_outOfBound_misurazione_electrical(clickhouse_client, kafka_writer):
1✔
33
    try:
1✔
34
        timestamp = datetime.now()
1✔
35
        low_bound_limit = 0
1✔
36
        upper_bound_limit = 1
1✔
37
        sensor_data = [
1✔
38
            {"id": "Id_1_electrical_correct_ofb", "cella": "Arcella", "timestamp": timestamp, "value": 0, "longitude": 11.859271, "latitude":45.39214, "type": "electricalFault"},
39
            {"id": "Id_2_electrical_correct_ofb", "cella": "Arcella", "timestamp": timestamp, "value": 1, "longitude": 11.859271, "latitude":45.39214, "type": "electricalFault"},
40
            {"id": "Id_1_electrical_error_ofb", "cella": "Arcella", "timestamp": timestamp, "value": 2, "longitude": 11.859271, "latitude":45.39214, "type": "electricalFault"}
41
        ]
42

43
        for data in sensor_data:
1✔
44
            misurazione = AdapterMisurazione(
1✔
45
                Misurazione(data["timestamp"], data["value"], data["type"], Coordinate(data["latitude"],data["longitude"]), data["id"], data["cella"]))
46
            kafka_writer.write(misurazione)
1✔
47

48
        kafka_writer.flush_kafka_producer()
1✔
49
        await asyncio.sleep(5)
1✔
50

51
        for data in sensor_data:
1✔
52
            result = clickhouse_client.query(f"SELECT * FROM innovacity.{table_to_test} where ID_sensore ='{data['id']}' and timestamp = '{data['timestamp']}' LIMIT 1")
1✔
NEW
53
            if(data["value"] == low_bound_limit or data["value"] == upper_bound_limit):
×
NEW
54
                assert result.result_rows
×
NEW
55
                assert result.result_rows[0][0] == data["id"]
×
NEW
56
                assert result.result_rows[0][1] == data["cella"]
×
NEW
57
                assert str(timestamp)[:19] == str(result.result_rows[0][2])[:19]
×
NEW
58
                assert float(result.result_rows[0][3]) == data["value"]
×
NEW
59
                assert result.result_rows[0][4] == 45.39214
×
NEW
60
                assert result.result_rows[0][5] == 11.859271
×
61
            else:
NEW
62
                assert not result.result_rows
×
63

64
    except Exception as e:
1✔
65
        pytest.fail(f"Failed to connect to ClickHouse database: {e}")
1✔
66

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc