• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 17130547606

21 Aug 2025 02:51PM UTC coverage: 55.206% (-0.3%) from 55.53%
17130547606

Pull #481

github

web-flow
Merge 49abd2cb0 into 36b845ee6
Pull Request #481: Fixed documents key for collection historical_connections that must be string

1 of 1 new or added line in 1 file covered. (100.0%)

7 existing lines in 1 file now uncovered.

1193 of 2161 relevant lines covered (55.21%)

1.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.27
/sdx_controller/utils/db_utils.py
1
import logging
2✔
2
import os
2✔
3
import time
2✔
4
from urllib.parse import urlparse
2✔
5

6
import pymongo
2✔
7
from sdx_datamodel.constants import MongoCollections
2✔
8

9
pymongo_logger = logging.getLogger("pymongo")
2✔
10
pymongo_logger.setLevel(logging.INFO)
2✔
11

12

13
def obfuscate_password_in_uri(uri: str) -> str:
2✔
14
    """
15
    Replace password field in URIs with a `*`, for logging.
16
    """
17
    parts = urlparse(uri)
2✔
18
    if parts.password:
2✔
19
        return uri.replace(parts.password, "*")
2✔
20
    else:
21
        return uri
×
22

23

24
class DbUtils(object):
2✔
25
    def __init__(self):
2✔
26
        self.db_name = os.environ.get("DB_NAME")
2✔
27
        if self.db_name is None:
2✔
28
            raise Exception("DB_NAME environment variable is not set")
29

30
        # self.config_table_name = os.environ.get("DB_CONFIG_TABLE_NAME")
31
        # if self.config_table_name is None:
32
        #     raise Exception("DB_CONFIG_TABLE_NAME environ variable is not set")
33

34
        mongo_user = os.getenv("MONGO_USER") or "guest"
2✔
35
        mongo_pass = os.getenv("MONGO_PASS") or "guest"
2✔
36
        mongo_host = os.getenv("MONGO_HOST")
2✔
37
        mongo_port = os.getenv("MONGO_PORT") or 27017
2✔
38

39
        if mongo_host is None:
2✔
40
            mongo_connstring = os.getenv("MONGODB_CONNSTRING")
×
41
            if mongo_connstring is None:
×
42
                raise Exception("Neither MONGO_HOST nor MONGODB_CONNSTRING is set")
43
        else:
44
            mongo_connstring = (
2✔
45
                f"mongodb://{mongo_user}:{mongo_pass}@{mongo_host}:{mongo_port}/"
46
            )
47

48
        self.logger = logging.getLogger(__name__)
2✔
49
        self.logger.setLevel(logging.getLevelName(os.getenv("LOG_LEVEL", "DEBUG")))
2✔
50

51
        # Log DB URI, without a password.
52
        self.logger.info(f"[DB] Using {obfuscate_password_in_uri(mongo_connstring)}")
2✔
53

54
        self.mongo_client = pymongo.MongoClient(mongo_connstring)
2✔
55

56
    def initialize_db(self):
2✔
57
        """
58
        Creates collections if not exist in DB
59
        """
60
        self.logger.debug(f"Trying to load {self.db_name} from DB")
2✔
61

62
        if self.db_name not in self.mongo_client.list_database_names():
2✔
63
            self.logger.debug(f"No existing {self.db_name} from DB, creating table")
2✔
64
            self.sdxdb = self.mongo_client[self.db_name]
2✔
65
            self.logger.debug(f"DB {self.db_name} initialized")
2✔
66

67
        self.sdxdb = self.mongo_client[self.db_name]
2✔
68
        # config_col = self.sdxdb[self.config_table_name]
69
        for key, collection in MongoCollections.__dict__.items():
2✔
70
            if (
2✔
71
                not key.startswith("__")
72
                and collection not in self.sdxdb.list_collection_names()
73
            ):
74
                self.sdxdb.create_collection(collection)
2✔
75

76
        self.logger.debug(f"DB {self.db_name} initialized")
2✔
77

78
    def add_key_value_pair_to_db(self, collection, key, value, max_retries=3):
2✔
79
        """
80
        Adds or replaces a key-value pair in the database.
81
        """
82
        key = str(key)
2✔
83
        retry_count = 0
2✔
84

85
        while retry_count < max_retries:
2✔
86
            try:
2✔
87
                obj = self.read_from_db(collection, key)
2✔
88

89
                if obj is None:
2✔
90
                    # Document doesn't exist, create a new one
91
                    document = {key: value}
2✔
92
                    result = self.sdxdb[collection].insert_one(document)
2✔
93

94
                    if result.acknowledged and result.inserted_id:
2✔
95
                        return result
2✔
96
                    logging.error("Insert operation not acknowledged")
×
97

98
                else:
99
                    # Document exists, replace with new key-value pair
100
                    new_document = {key: value}
2✔
101
                    new_document["_id"] = obj["_id"]
2✔
102

103
                    query = {"_id": obj["_id"]}
2✔
104
                    result = self.sdxdb[collection].replace_one(query, new_document)
2✔
105

106
                    if result.acknowledged and result.modified_count == 1:
2✔
107
                        return result
2✔
108
                    logging.error(
×
109
                        f"Replace operation not successful: modified_count={result.modified_count}"
110
                    )
111

UNCOV
112
            except Exception as e:
×
UNCOV
113
                retry_count += 1
×
UNCOV
114
                if retry_count >= max_retries:
×
UNCOV
115
                    logging.error(
×
116
                        f"Failed to add key-value pair after {max_retries} attempts. Collection: {collection}, Key: {key}. Error: {str(e)}"
117
                    )
UNCOV
118
                    return None
×
119

UNCOV
120
                time.sleep(0.5 * (2**retry_count))
×
UNCOV
121
                logging.warning(
×
122
                    f"Retry {retry_count}/{max_retries} for adding key-value pair. Collection: {collection}, Key: {key}"
123
                )
124

125
        return None
×
126

127
    def update_field_in_json(self, collection, key, field_name, field_value):
2✔
128
        """
129
        Updates a single field in a JSON object.
130
        """
131
        key = str(key)
2✔
132

133
        try:
2✔
134
            # Update a nested field directly
135
            # Format: {key}.{field_name} targets a specific field within a JSON object
136
            update_query = {"$set": {f"{key}.{field_name}": field_value}}
2✔
137

138
            # Perform atomic update operation
139
            result = self.sdxdb[collection].update_one(
2✔
140
                {key: {"$exists": True}},  # Find document where the key exists
141
                update_query,
142
            )
143

144
            if result.matched_count == 0:
2✔
145
                logging.error(
×
146
                    f"Document with key '{key}' not found in collection '{collection}'"
147
                )
148
                return None
×
149

150
            return result
2✔
151

152
        except Exception as e:
×
153
            logging.error(
×
154
                f"Failed to update field. Collection: {collection}, Key: {key}, Field: {field_name}. Error: {str(e)}"
155
            )
156
            return None
×
157

158
    def read_from_db(self, collection, key):
2✔
159
        """
160
        Reads a document from the database using the specified key.
161
        """
162
        key = str(key)
2✔
163
        try:
2✔
164
            # Find document where the key exists and not marked as deleted
165
            result = self.sdxdb[collection].find_one(
2✔
166
                {"$expr": {"$getField": key}, "deleted": {"$ne": True}}
167
            )
168
            return result
2✔
169
        except Exception as e:
×
170
            logging.error(
×
171
                f"Error reading from database. Collection: {collection}, Key: {key}. Error: {str(e)}"
172
            )
173
            return None
×
174

175
    def get_value_from_db(self, collection, key):
2✔
176
        """
177
        Gets just the value for a specific key from the database.
178
        """
179
        document = self.read_from_db(collection, key)
2✔
180

181
        if document and key in document:
2✔
182
            return document[key]
2✔
183
        return None
2✔
184

185
    def get_all_entries_in_collection(self, collection):
2✔
186
        """
187
        Gets all entries in a Mongo collection
188
        """
189
        db_collection = self.sdxdb[collection]
2✔
190
        # MongoDB has an ObjectId for each item, so need to exclude the ObjectIds
191
        all_entries = db_collection.find({"deleted": {"$ne": True}}, {"_id": 0})
2✔
192
        return all_entries
2✔
193

194
    def mark_deleted(self, collection, key):
2✔
195
        """
196
        Marks an entry deleted
197
        """
198
        db_collection = self.sdxdb[collection]
2✔
199
        key = str(key)
2✔
200
        item_to_delete = self.read_from_db(collection, key)
2✔
201
        if item_to_delete is None:
2✔
202
            return False
2✔
203
        filter = {"_id": item_to_delete["_id"]}
×
204
        update = {"$set": {"deleted": True}}
×
205
        db_collection.update_one(filter, update)
×
206
        return True
×
207

208
    def delete_one_entry(self, collection, key):
2✔
209
        """
210
        Actually deletes one entry
211
        """
212
        key = str(key)
2✔
213
        db_collection = self.sdxdb[collection]
2✔
214
        db_collection.delete_one({key: {"$exists": True}})
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc