• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tfcollins / telemetry / 10495659905

21 Aug 2024 06:45PM UTC coverage: 34.158% (-24.1%) from 58.234%
10495659905

push

github

tfcollins
Put exceptions back for missing backends

Signed-off-by: Travis F. Collins <travis.collins@analog.com>

0 of 2 new or added lines in 1 file covered. (0.0%)

211 existing lines in 13 files now uncovered.

428 of 1253 relevant lines covered (34.16%)

0.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/telemetry/dev/core.py
1
"""Core features for development board telemetry."""
2

3
import datetime
×
4
import os
×
5
from typing import List
×
6

7
import pymongo
×
8
from minio import Minio
×
9
import yaml
×
10

11

12
class Core:
×
13
    def __init__(
×
14
        self,
15
        project_type,
16
        configfilename=None,
17
        configs: dict = {},
18
    ):
19

20
        extracted_config = {"mongo": None, "minio": None}
×
21
        if configfilename:
×
22
            if not os.path.isfile(configfilename):
×
23
                raise Exception(f"Config file {configfilename} does not exist")
×
24

25
            with open(configfilename) as f:
×
26
                config = yaml.load(f, Loader=yaml.FullLoader)
×
27

28
            for k in config:
×
29
                project = config[k]
×
30
                project_name = project["project"]
×
31
                if project_name == project_type:
×
32
                    servers = project["servers"]
×
33
                    for server in servers:
×
34
                        if "type" not in server:
×
35
                            continue
×
36
                        if server["type"] == "mongo":
×
37
                            extracted_config["mongo"] = server
×
38
                        elif server["type"] == "minio":
×
39
                            extracted_config["minio"] = server
×
40

41
        # Merge configs where configs input takes precedence
42
        for k in extracted_config:
×
43
            if k in configs:
×
44
                extracted_config[k] = {**extracted_config[k], **configs[k]}
×
45

46
        if "mongo" in extracted_config:
×
47
            self.mongo = self.setup_mongo(extracted_config["mongo"])
×
48
        else:
NEW
49
            raise Exception("No MongoDB configuration found")
×
50

51
        if "minio" in extracted_config:
×
52
            self.minio = self.setup_minio(extracted_config["minio"])
×
53
        else:
NEW
54
            raise Exception("No Minio configuration found")
×
55

56
    def setup_mongo(self, config):
×
57
        """Setup MongoDB connection."""
58
        # Check config
59
        required_fields = [
×
60
            "username",
61
            "password",
62
            "address",
63
            "port",
64
            "database",
65
            "collection",
66
        ]
67
        for f in required_fields:
×
68
            if f not in config:
×
69
                raise Exception(f"Mongo config missing field {f}")
×
70

71
        cmd = f"mongodb://{config['username']}:{config['password']}@{config['address']}:{config['port']}/"
×
72

73
        try:
×
74
            self.client = pymongo.MongoClient(cmd)
×
75
        except Exception as e1:
×
76
            try:
×
77
                cmd = cmd.replace("mongodb+srv://", "mongodb://")
×
78
                self.client = pymongo.MongoClient(cmd)
×
79
            except Exception as e2:
×
80
                print(e1, e2)
×
81
                raise Exception("Unable to connect to MongoDB")
×
82

83
        db = self.client[config["database"]]
×
84
        self.collection = db[config["collection"]]
×
85

86
        return self.client
×
87

88
    def setup_minio(self, config):
×
89
        """Setup Minio connection."""
90
        # Check config
91
        required_fields = ["address", "port", "access_key", "secret_key"]
×
92
        for f in required_fields:
×
93
            if f not in config:
×
94
                raise Exception(f"Minio config missing field {f}")
×
95
        address = f"{config['address']}:{config['port']}"
×
96
        return Minio(
×
97
            address,
98
            access_key=config["access_key"],
99
            secret_key=config["secret_key"],
100
            secure=False,
101
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc