• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ByteOps-swe / MVP / 8490927449

30 Mar 2024 12:57PM UTC coverage: 42.353% (-51.8%) from 94.161%
8490927449

push

github

web-flow
Merge pull request #39 from ByteOps-swe/Barutta02-patch-2

Tolto test

27 of 126 branches covered (21.43%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

1143 existing lines in 77 files now uncovered.

981 of 2254 relevant lines covered (43.52%)

0.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.73
/PythonSensorsSimulator/Test/IntegrationTest/TI_clickhouse_data.py
1
# pylint: skip-file
2
import os
1✔
3
from datetime import datetime
1✔
4
import asyncio
1✔
5
import pytest
1✔
6
import clickhouse_connect
1✔
7

8
from ...Model.Simulators.coordinate import coordinate
1✔
9
from ...Model.Simulators.misurazione import misurazione
1✔
10
from ...Model.Writers.kafka_writer import kafka_writer
1✔
11
from ...Model.Writers.KafkaAdapter.kafka_confluent_adapter import kafka_confluent_adapter
1✔
12
from ...Model.adapter_misurazione import adapter_misurazione
1✔
13

14
KAFKA_HOST = os.environ.get("KAFKA_HOST", "kafka")
1✔
15
KAFKA_PORT = os.environ.get("KAFKA_PORT", "9092")
1✔
16
test_topic = "test"
1✔
17
table_to_test = "test"
1✔
18

19
@pytest.fixture(scope='module')
1✔
20
def clickhouse_client():
1✔
UNCOV
21
    client = clickhouse_connect.get_client(host='clickhouse', port=8123, database="innovacity")
×
UNCOV
22
    yield client
×
UNCOV
23
    client.close()
×
24

25
@pytest.fixture
1✔
26
def kafka_write():
1✔
UNCOV
27
    adapter_kafka = kafka_confluent_adapter(test_topic, KAFKA_HOST, KAFKA_PORT)
×
UNCOV
28
    kafka_write = kafka_writer(adapter_kafka)
×
UNCOV
29
    yield kafka_write
×
30

31
@pytest.mark.asyncio
1✔
32
async def test_1_misurazione(clickhouse_client, kafka_write):
1✔
UNCOV
33
    try:
×
UNCOV
34
        timestamp = datetime.now()
×
UNCOV
35
        measure = adapter_misurazione(
×
36
            misurazione(timestamp, 4001, "Temperature", coordinate(45.39214, 11.859271), "Id_1_mis_test", "Arcella1"))
UNCOV
37
        kafka_write.write(measure)
×
UNCOV
38
        kafka_write.flush_kafka_producer()
×
39

UNCOV
40
        query = f"SELECT * FROM innovacity.{table_to_test} where ID_sensore ='Id_1_mis_test' and timestamp = '{str(timestamp)}' LIMIT 1"
×
41

UNCOV
42
        result = clickhouse_client.query(query)
×
UNCOV
43
        iter = 0
×
UNCOV
44
        max_seconds_to_wait = 7
×
UNCOV
45
        intervallo_sleep = 0.5
×
UNCOV
46
        while (not result.result_rows) and (iter * intervallo_sleep < max_seconds_to_wait):
×
UNCOV
47
            await asyncio.sleep(intervallo_sleep)
×
UNCOV
48
            result = clickhouse_client.query(query)
×
UNCOV
49
            iter += 1
×
50
        
UNCOV
51
        assert result.result_rows
×
UNCOV
52
        assert float(result.result_rows[0][3]) == 4001
×
53
    except Exception as e:
×
54
        pytest.fail(f"Failed to connect to ClickHouse database: {e}")
×
55

56
@pytest.mark.asyncio
1✔
57
async def test_multiple_misurazioni(clickhouse_client, kafka_write):
1✔
UNCOV
58
    try:
×
UNCOV
59
        num_messages = 100
×
UNCOV
60
        starting_value = 5001
×
UNCOV
61
        timestamps = []
×
UNCOV
62
        for i in range(num_messages):
×
UNCOV
63
            timestamp = datetime.now()
×
UNCOV
64
            timestamps.append(timestamp)
×
UNCOV
65
            measure = adapter_misurazione(
×
66
                            misurazione(timestamp, starting_value + i, "Temperature", coordinate(45.39214, 11.859271), "Id_multi_mis_test", "ArcellaTest"))
UNCOV
67
            kafka_write.write(measure)
×
UNCOV
68
        kafka_write.flush_kafka_producer()
×
69
       
70
       
UNCOV
71
        query = f"SELECT * FROM innovacity.{table_to_test} where ID_sensore ='Id_multi_mis_test'  and timestamp >= '{timestamps[0]}' ORDER BY (timestamp,value) DESC LIMIT {num_messages}"
×
UNCOV
72
        result = clickhouse_client.query(query)
×
UNCOV
73
        iter = 0
×
UNCOV
74
        max_seconds_to_wait = 10
×
UNCOV
75
        intervallo_sleep = 0.5
×
UNCOV
76
        while (len(result.result_rows) < num_messages) and (iter * intervallo_sleep < max_seconds_to_wait):
×
UNCOV
77
            await asyncio.sleep(intervallo_sleep)
×
UNCOV
78
            result = clickhouse_client.query(query)
×
UNCOV
79
            iter += 1
×
80

UNCOV
81
        for i in range(num_messages):
×
UNCOV
82
            assert (starting_value + num_messages - 1 -i) == float(result.result_rows[i][3])
×
UNCOV
83
            assert timestamps[num_messages -1 -i] == result.result_rows[i][2]
×
84
    except Exception as e:
×
85
        pytest.fail(f"Failed to send and consume data: {e}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc