• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 14672778579

25 Apr 2025 08:17PM UTC coverage: 56.051% (-0.09%) from 56.145%
14672778579

Pull #453

github

web-flow
Merge 63a6fd14f into 03aa84af5
Pull Request #453: Assign oxp_success_count to 0 only if it does not exist

30 of 54 new or added lines in 4 files covered. (55.56%)

1 existing line in 1 file now uncovered.

1181 of 2107 relevant lines covered (56.05%)

1.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.0
/sdx_controller/utils/db_utils.py
1
import logging
2✔
2
import os
2✔
3
import time
2✔
4
from urllib.parse import urlparse
2✔
5

6
import pymongo
2✔
7
from sdx_datamodel.constants import MongoCollections
2✔
8

9
pymongo_logger = logging.getLogger("pymongo")
2✔
10
pymongo_logger.setLevel(logging.INFO)
2✔
11

12

13
def obfuscate_password_in_uri(uri: str) -> str:
2✔
14
    """
15
    Replace password field in URIs with a `*`, for logging.
16
    """
17
    parts = urlparse(uri)
2✔
18
    if parts.password:
2✔
19
        return uri.replace(parts.password, "*")
2✔
20
    else:
21
        return uri
×
22

23

24
class DbUtils(object):
2✔
25
    def __init__(self):
2✔
26
        self.db_name = os.environ.get("DB_NAME")
2✔
27
        if self.db_name is None:
2✔
28
            raise Exception("DB_NAME environment variable is not set")
29

30
        # self.config_table_name = os.environ.get("DB_CONFIG_TABLE_NAME")
31
        # if self.config_table_name is None:
32
        #     raise Exception("DB_CONFIG_TABLE_NAME environ variable is not set")
33

34
        mongo_user = os.getenv("MONGO_USER") or "guest"
2✔
35
        mongo_pass = os.getenv("MONGO_PASS") or "guest"
2✔
36
        mongo_host = os.getenv("MONGO_HOST")
2✔
37
        mongo_port = os.getenv("MONGO_PORT") or 27017
2✔
38

39
        if mongo_host is None:
2✔
40
            mongo_connstring = os.getenv("MONGODB_CONNSTRING")
×
41
            if mongo_connstring is None:
×
42
                raise Exception("Neither MONGO_HOST nor MONGODB_CONNSTRING is set")
43
        else:
44
            mongo_connstring = (
2✔
45
                f"mongodb://{mongo_user}:{mongo_pass}@{mongo_host}:{mongo_port}/"
46
            )
47

48
        self.logger = logging.getLogger(__name__)
2✔
49
        self.logger.setLevel(logging.getLevelName(os.getenv("LOG_LEVEL", "DEBUG")))
2✔
50

51
        # Log DB URI, without a password.
52
        self.logger.info(f"[DB] Using {obfuscate_password_in_uri(mongo_connstring)}")
2✔
53

54
        self.mongo_client = pymongo.MongoClient(mongo_connstring)
2✔
55

56
    def initialize_db(self):
2✔
57
        self.logger.debug(f"Trying to load {self.db_name} from DB")
2✔
58

59
        if self.db_name not in self.mongo_client.list_database_names():
2✔
60
            self.logger.debug(f"No existing {self.db_name} from DB, creating table")
2✔
61
            self.sdxdb = self.mongo_client[self.db_name]
2✔
62
            self.logger.debug(f"DB {self.db_name} initialized")
2✔
63

64
        self.sdxdb = self.mongo_client[self.db_name]
2✔
65
        # config_col = self.sdxdb[self.config_table_name]
66
        for key, collection in MongoCollections.__dict__.items():
2✔
67
            if (
2✔
68
                not key.startswith("__")
69
                and collection not in self.sdxdb.list_collection_names()
70
            ):
71
                self.sdxdb.create_collection(collection)
2✔
72

73
        self.logger.debug(f"DB {self.db_name} initialized")
2✔
74

75
    def add_key_value_pair_to_db(self, collection, key, value, max_retries=3):
2✔
76
        """
77
        Adds or replaces a key-value pair in the database.
78
        """
79
        key = str(key)
2✔
80
        retry_count = 0
2✔
81

82
        while retry_count < max_retries:
2✔
83
            try:
2✔
84
                obj = self.read_from_db(collection, key)
2✔
85

86
                if obj is None:
2✔
87
                    # Document doesn't exist, create a new one
88
                    document = {key: value}
2✔
89
                    result = self.sdxdb[collection].insert_one(document)
2✔
90

91
                    if result.acknowledged and result.inserted_id:
2✔
92
                        return result
2✔
NEW
93
                    logging.error("Insert operation not acknowledged")
×
94

95
                else:
96
                    # Document exists, replace with new key-value pair
97
                    new_document = {key: value}
2✔
98
                    new_document["_id"] = obj["_id"]
2✔
99

100
                    query = {"_id": obj["_id"]}
2✔
101
                    result = self.sdxdb[collection].replace_one(query, new_document)
2✔
102

103
                    if result.acknowledged and result.modified_count == 1:
2✔
104
                        return result
2✔
NEW
105
                    logging.error(
×
106
                        f"Replace operation not successful: modified_count={result.modified_count}"
107
                    )
108

NEW
109
            except Exception as e:
×
NEW
110
                retry_count += 1
×
NEW
111
                if retry_count >= max_retries:
×
NEW
112
                    logging.error(
×
113
                        f"Failed to add key-value pair after {max_retries} attempts. Collection: {collection}, Key: {key}. Error: {str(e)}"
114
                    )
NEW
115
                    return None
×
116

NEW
117
                time.sleep(0.5 * (2**retry_count))
×
NEW
118
                logging.warning(
×
119
                    f"Retry {retry_count}/{max_retries} for adding key-value pair. Collection: {collection}, Key: {key}"
120
                )
121

NEW
122
        return None
×
123

124
    def update_field_in_json(self, collection, key, field_name, field_value):
2✔
125
        """
126
        Updates a single field in a JSON object.
127
        """
128
        key = str(key)
2✔
129

130
        try:
2✔
131
            # Update a nested field directly
132
            # Format: {key}.{field_name} targets a specific field within a JSON object
133
            update_query = {"$set": {f"{key}.{field_name}": field_value}}
2✔
134

135
            # Perform atomic update operation
136
            result = self.sdxdb[collection].update_one(
2✔
137
                {key: {"$exists": True}},  # Find document where the key exists
138
                update_query,
139
            )
140

141
            if result.matched_count == 0:
2✔
NEW
142
                logging.error(
×
143
                    f"Document with key '{key}' not found in collection '{collection}'"
144
                )
NEW
145
                return None
×
146

147
            return result
2✔
148

NEW
149
        except Exception as e:
×
NEW
150
            logging.error(
×
151
                f"Failed to update field. Collection: {collection}, Key: {key}, Field: {field_name}. Error: {str(e)}"
152
            )
NEW
153
            return None
×
154

155
    def read_from_db(self, collection, key):
2✔
156
        key = str(key)
2✔
157
        return self.sdxdb[collection].find_one(
2✔
158
            {key: {"$exists": 1}, "deleted": {"$ne": True}}
159
        )
160

161
    def get_all_entries_in_collection(self, collection):
2✔
162
        db_collection = self.sdxdb[collection]
2✔
163
        # MongoDB has an ObjectId for each item, so need to exclude the ObjectIds
164
        all_entries = db_collection.find({"deleted": {"$ne": True}}, {"_id": 0})
2✔
165
        return all_entries
2✔
166

167
    def mark_deleted(self, collection, key):
2✔
168
        db_collection = self.sdxdb[collection]
2✔
169
        key = str(key)
2✔
170
        item_to_delete = self.read_from_db(collection, key)
2✔
171
        if item_to_delete is None:
2✔
172
            return False
2✔
173
        filter = {"_id": item_to_delete["_id"]}
×
174
        update = {"$set": {"deleted": True}}
×
175
        db_collection.update_one(filter, update)
×
176
        return True
×
177

178
    def delete_one_entry(self, collection, key):
2✔
179
        key = str(key)
2✔
180
        db_collection = self.sdxdb[collection]
2✔
181
        db_collection.delete_one({key: {"$exists": True}})
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc