• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ByteOps-swe / MVP / 8490927449

30 Mar 2024 12:57PM UTC coverage: 42.353% (-51.8%) from 94.161%
8490927449

push

github

web-flow
Merge pull request #39 from ByteOps-swe/Barutta02-patch-2

Tolto test

27 of 126 branches covered (21.43%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

1143 existing lines in 77 files now uncovered.

981 of 2254 relevant lines covered (43.52%)

0.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.48
/PythonSensorsSimulator/Test/SystemTest/TS_json_format.py
1
# pylint: skip-file
2
import os
1✔
3
from datetime import datetime
1✔
4
from unittest.mock import Mock
1✔
5
import pytest
1✔
6
import asyncio
1✔
7

8
import clickhouse_connect
1✔
9
from ...Model.Writers.kafka_writer import kafka_writer
1✔
10
from ...Model.Writers.KafkaAdapter.kafka_confluent_adapter import kafka_confluent_adapter
1✔
11

12
KAFKA_HOST = os.environ.get("KAFKA_HOST", "kafka")
1✔
13
KAFKA_PORT = os.environ.get("KAFKA_PORT", "9092")
1✔
14

15
test_topic = "test"
1✔
16
table_to_test = "test"
1✔
17

18
@pytest.fixture(scope='module')
1✔
19
def clickhouse_client():
1✔
UNCOV
20
    client = clickhouse_connect.get_client(host='clickhouse', port=8123, database="innovacity")
×
UNCOV
21
    yield client
×
UNCOV
22
    client.close()
×
23

24
@pytest.fixture
1✔
25
def kafka_write():
1✔
UNCOV
26
    adapter_kafka = kafka_confluent_adapter(test_topic, KAFKA_HOST, KAFKA_PORT)
×
UNCOV
27
    kafka_write = kafka_writer(adapter_kafka)
×
UNCOV
28
    yield kafka_write
×
29
    
30
@pytest.mark.asyncio
1✔
31
async def test_missing_data_field(clickhouse_client, kafka_write):
1✔
UNCOV
32
    try:
×
UNCOV
33
        mock_adapter_misurazione_corretta = Mock()
×
UNCOV
34
        mock_adapter_misurazione_corretta.to_json.return_value = {
×
35
            "timestamp": "2024-03-05 12:30:00.000000",
36
            "value": 25.50,
37
            "type": "tipo",
38
            "latitude": 123.45,
39
            "longitude": 67.89,
40
            "ID_sensore": "id_json_format_correct1",
41
            "cella": "cella"
42
        }
UNCOV
43
        mock_adapter_misurazione_sbagliata = Mock()
×
UNCOV
44
        mock_adapter_misurazione_sbagliata.to_json.return_value = {
×
45
            "timestamp": "2024-03-05 12:30:00.000000",
46
            "value": 25.50,
47
            "type": "tipo",
48
            "latitude": 123.45,
49
            "longitude": 67.89,
50
            "ID_sensore": "id_json_time_wrong1",
51
        }
UNCOV
52
        kafka_write.write(mock_adapter_misurazione_corretta) 
×
UNCOV
53
        kafka_write.flush_kafka_producer()
×
UNCOV
54
        await asyncio.sleep(10)
×
UNCOV
55
        result = clickhouse_client.query(f"SELECT * FROM innovacity.{table_to_test} where ID_sensore = 'id_json_format_correct1' LIMIT 1")
×
UNCOV
56
        print(result.result_rows[0][3])
×
UNCOV
57
        assert float(result.result_rows[0][3]) == 25.5
×
UNCOV
58
        result = clickhouse_client.query(f"SELECT * FROM innovacity.{table_to_test} where ID_sensore = 'id_json_format_wrong1' LIMIT 1")
×
UNCOV
59
        print(result.result_rows)
×
UNCOV
60
        assert not result.result_rows
×
61
    except Exception as e:
×
62
        pytest.fail(f"Failed to connect to ClickHouse database: {e}")
×
63
        
64
@pytest.mark.asyncio
1✔
65
async def test_wrong_field_order(clickhouse_client, kafka_write):
1✔
UNCOV
66
    try:
×
UNCOV
67
        mock_adapter_misurazione_corretta = Mock()
×
UNCOV
68
        mock_adapter_misurazione_corretta.to_json.return_value = {
×
69
            "timestamp": "2024-03-05 12:30:00.000000",
70
            "value": 25.50,
71
            "type": "tipo",
72
            "latitude": 123.45,
73
            "longitude": 67.89,
74
            "ID_sensore": "id_json_format_correct2",
75
            "cella": "cella"
76
        }
UNCOV
77
        mock_adapter_misurazione_sbagliata = Mock()
×
UNCOV
78
        mock_adapter_misurazione_sbagliata.to_json.return_value = {
×
79
            "timestamp": "2024-03-05 12:30:00.000000",
80
            "value": 25.50,
81
            "latitude": 123.45,
82
            "type": "tipo",
83
            "ID_sensore": "id_json_format_wrong2",
84
            "longitude": 67.89,
85
            "cella": "cella"
86
        }
UNCOV
87
        kafka_write.write(mock_adapter_misurazione_corretta) 
×
UNCOV
88
        kafka_write.flush_kafka_producer()
×
UNCOV
89
        await asyncio.sleep(10)
×
UNCOV
90
        result = clickhouse_client.query(f"SELECT * FROM innovacity.{table_to_test} where ID_sensore = 'id_json_format_correct2' LIMIT 1")
×
UNCOV
91
        print(result.result_rows[0][3])
×
UNCOV
92
        assert float(result.result_rows[0][3]) == 25.5
×
UNCOV
93
        result = clickhouse_client.query(f"SELECT * FROM innovacity.{table_to_test} where ID_sensore = 'id_json_format_wrong2' LIMIT 1")
×
UNCOV
94
        print(result.result_rows)
×
UNCOV
95
        assert not result.result_rows
×
96
    except Exception as e:
×
97
        pytest.fail(f"Failed to connect to ClickHouse database: {e}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc