• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / sdx-controller / 14601692800

22 Apr 2025 06:12PM UTC coverage: 56.08% (-0.07%) from 56.145%
14601692800

Pull #453

github

web-flow
Merge dd330cecd into 0e62a4c6d
Pull Request #453: Assign oxp_success_count to 0 only if it does not exist

19 of 31 new or added lines in 2 files covered. (61.29%)

1 existing line in 1 file now uncovered.

1176 of 2097 relevant lines covered (56.08%)

1.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.68
/sdx_controller/utils/db_utils.py
1
import logging
2✔
2
import os
2✔
3
import time
2✔
4
from urllib.parse import urlparse
2✔
5

6
import pymongo
2✔
7
from sdx_datamodel.constants import MongoCollections
2✔
8

9
pymongo_logger = logging.getLogger("pymongo")
2✔
10
pymongo_logger.setLevel(logging.INFO)
2✔
11

12

13
def obfuscate_password_in_uri(uri: str) -> str:
2✔
14
    """
15
    Replace password field in URIs with a `*`, for logging.
16
    """
17
    parts = urlparse(uri)
2✔
18
    if parts.password:
2✔
19
        return uri.replace(parts.password, "*")
2✔
20
    else:
21
        return uri
×
22

23

24
class DbUtils(object):
2✔
25
    def __init__(self):
2✔
26
        self.db_name = os.environ.get("DB_NAME")
2✔
27
        if self.db_name is None:
2✔
28
            raise Exception("DB_NAME environment variable is not set")
29

30
        # self.config_table_name = os.environ.get("DB_CONFIG_TABLE_NAME")
31
        # if self.config_table_name is None:
32
        #     raise Exception("DB_CONFIG_TABLE_NAME environ variable is not set")
33

34
        mongo_user = os.getenv("MONGO_USER") or "guest"
2✔
35
        mongo_pass = os.getenv("MONGO_PASS") or "guest"
2✔
36
        mongo_host = os.getenv("MONGO_HOST")
2✔
37
        mongo_port = os.getenv("MONGO_PORT") or 27017
2✔
38

39
        if mongo_host is None:
2✔
40
            mongo_connstring = os.getenv("MONGODB_CONNSTRING")
×
41
            if mongo_connstring is None:
×
42
                raise Exception("Neither MONGO_HOST nor MONGODB_CONNSTRING is set")
43
        else:
44
            mongo_connstring = (
2✔
45
                f"mongodb://{mongo_user}:{mongo_pass}@{mongo_host}:{mongo_port}/"
46
            )
47

48
        self.logger = logging.getLogger(__name__)
2✔
49
        self.logger.setLevel(logging.getLevelName(os.getenv("LOG_LEVEL", "DEBUG")))
2✔
50

51
        # Log DB URI, without a password.
52
        self.logger.info(f"[DB] Using {obfuscate_password_in_uri(mongo_connstring)}")
2✔
53

54
        self.mongo_client = pymongo.MongoClient(mongo_connstring)
2✔
55

56
    def initialize_db(self):
2✔
57
        self.logger.debug(f"Trying to load {self.db_name} from DB")
2✔
58

59
        if self.db_name not in self.mongo_client.list_database_names():
2✔
60
            self.logger.debug(f"No existing {self.db_name} from DB, creating table")
2✔
61
            self.sdxdb = self.mongo_client[self.db_name]
2✔
62
            self.logger.debug(f"DB {self.db_name} initialized")
2✔
63

64
        self.sdxdb = self.mongo_client[self.db_name]
2✔
65
        # config_col = self.sdxdb[self.config_table_name]
66
        for key, collection in MongoCollections.__dict__.items():
2✔
67
            if (
2✔
68
                not key.startswith("__")
69
                and collection not in self.sdxdb.list_collection_names()
70
            ):
71
                self.sdxdb.create_collection(collection)
2✔
72

73
        self.logger.debug(f"DB {self.db_name} initialized")
2✔
74

75
    def add_key_value_pair_to_db(self, collection, key, value, max_retries=3):
2✔
76
        key = str(key)
2✔
77
        retry_count = 0
2✔
78
        
79
        while retry_count < max_retries:
2✔
80
            try:
2✔
81
                obj = self.read_from_db(collection, key)
2✔
82
                
83
                if obj is None:
2✔
84
                    # Document doesn't exist, create a new one
85
                    document = {key: value}
2✔
86
                    result = self.sdxdb[collection].insert_one(document)
2✔
87
                    
88
                    if result.acknowledged and result.inserted_id:
2✔
89
                        return result
2✔
NEW
90
                    logging.error("Insert operation not acknowledged")
×
91
                    
92
                else:
93
                    # Document exists, replace with new key-value pair
94
                    new_document = {key: value}
2✔
95
                    new_document["_id"] = obj["_id"]
2✔
96
                    
97
                    query = {"_id": obj["_id"]}
2✔
98
                    result = self.sdxdb[collection].replace_one(query, new_document)
2✔
99
                    
100
                    if result.acknowledged and result.modified_count == 1:
2✔
101
                        return result
2✔
NEW
102
                    logging.error(f"Replace operation not successful: modified_count={result.modified_count}")
×
103
                    
NEW
104
            except Exception as e:
×
NEW
105
                retry_count += 1
×
NEW
106
                if retry_count >= max_retries:
×
NEW
107
                    logging.error(f"Failed to add key-value pair after {max_retries} attempts. Collection: {collection}, Key: {key}. Error: {str(e)}")
×
NEW
108
                    return None
×
109
                
NEW
110
                time.sleep(0.5 * (2 ** retry_count))
×
NEW
111
                logging.warning(f"Retry {retry_count}/{max_retries} for adding key-value pair. Collection: {collection}, Key: {key}")
×
112
        
NEW
113
        return None
×
114

115
    def read_from_db(self, collection, key):
2✔
116
        key = str(key)
2✔
117
        return self.sdxdb[collection].find_one(
2✔
118
            {key: {"$exists": 1}, "deleted": {"$ne": True}}
119
        )
120

121
    def get_all_entries_in_collection(self, collection):
2✔
122
        db_collection = self.sdxdb[collection]
2✔
123
        # MongoDB has an ObjectId for each item, so need to exclude the ObjectIds
124
        all_entries = db_collection.find({"deleted": {"$ne": True}}, {"_id": 0})
2✔
125
        return all_entries
2✔
126

127
    def mark_deleted(self, collection, key):
2✔
128
        db_collection = self.sdxdb[collection]
2✔
129
        key = str(key)
2✔
130
        item_to_delete = self.read_from_db(collection, key)
2✔
131
        if item_to_delete is None:
2✔
132
            return False
2✔
133
        filter = {"_id": item_to_delete["_id"]}
×
134
        update = {"$set": {"deleted": True}}
×
135
        db_collection.update_one(filter, update)
×
136
        return True
×
137

138
    def delete_one_entry(self, collection, key):
2✔
139
        key = str(key)
2✔
140
        db_collection = self.sdxdb[collection]
2✔
141
        db_collection.delete_one({key: {"$exists": True}})
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc