• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sparkmicro / Ki-nTree / 12538415962

29 Dec 2024 08:17PM UTC coverage: 44.042% (-28.4%) from 72.409%
12538415962

push

github

web-flow
Merge pull request #270 from sparkmicro/dev

1.2.0

46 of 60 new or added lines in 7 files covered. (76.67%)

822 existing lines in 10 files now uncovered.

1227 of 2786 relevant lines covered (44.04%)

1.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.28
/kintree/database/inventree_interface.py
1
import copy
4✔
2

3
from ..config import settings
4✔
4
from ..common import part_tools, progress
4✔
5
from ..common.tools import cprint
4✔
6
from ..config import config_interface
4✔
7
from ..database import inventree_api
4✔
8
from ..search import search_api, automationdirect_api, digikey_api, mouser_api, element14_api, lcsc_api, jameco_api, tme_api
4✔
9

10
category_separator = '/'
4✔
11

12

13
def connect_to_server(timeout=5) -> bool:
4✔
14
    ''' Connect to InvenTree server using user settings '''
UNCOV
15
    connect = False
×
UNCOV
16
    settings.load_inventree_settings()
×
UNCOV
17
    if not settings.USERNAME:
×
18
        token = settings.PASSWORD
×
19
    else:
UNCOV
20
        token = ''
×
21

UNCOV
22
    try:
×
UNCOV
23
        connect = inventree_api.connect(server=settings.SERVER_ADDRESS,
×
24
                                        username=settings.USERNAME,
25
                                        password=settings.PASSWORD,
26
                                        proxies=settings.PROXIES,
27
                                        token=token,
28
                                        connect_timeout=timeout)
29
    except TimeoutError:
×
30
        pass
×
31

UNCOV
32
    if not connect:
×
33
        if not settings.SERVER_ADDRESS:
×
34
            cprint('[TREE]\tError connecting to InvenTree server: missing server address')
×
35
            return connect
×
36
        if not settings.USERNAME:
×
37
            cprint('[TREE]\tError connecting to InvenTree server: missing username')
×
38
            return connect
×
39
        if not settings.PASSWORD:
×
40
            cprint('[TREE]\tError connecting to InvenTree server: missing password')
×
41
            return connect
×
42
        cprint('[TREE]\tError connecting to InvenTree server: invalid address, username or password')
×
43
    else:
UNCOV
44
        env = [env_type.name for env_type in settings.Environment
×
45
               if env_type.value == settings.environment][0]
UNCOV
46
        cprint(f'[TREE]\tSuccessfully connected to InvenTree server (ENV={env})', silent=settings.SILENT)
×
47

UNCOV
48
    return connect
×
49

50

51
def category_tree(tree: str) -> str:
4✔
UNCOV
52
    import re
×
UNCOV
53
    find_prefix = re.match(r'^-+ (.+?)$', tree)
×
UNCOV
54
    if find_prefix:
×
UNCOV
55
        return find_prefix.group(1)
×
UNCOV
56
    return tree
×
57

58

59
def split_category_tree(tree: str) -> list:
4✔
60
    return category_tree(tree).split(category_separator)
×
61

62

63
def build_category_tree(reload=False, category=None) -> dict:
4✔
64
    '''Build InvenTree category tree from database data'''
65

UNCOV
66
    category_data = config_interface.load_file(settings.CONFIG_CATEGORIES)
×
67

UNCOV
68
    def build_tree(tree, left_to_go, level) -> list:
×
UNCOV
69
        try:
×
UNCOV
70
            last_entry = f' {category_tree(tree[-1])}{category_separator}'
×
UNCOV
71
        except IndexError:
×
UNCOV
72
            last_entry = ''
×
UNCOV
73
        if isinstance(left_to_go, dict):
×
UNCOV
74
            for key, value in left_to_go.items():
×
UNCOV
75
                tree.append(f'{"-" * level}{last_entry}{key}')
×
UNCOV
76
                build_tree(tree, value, level + 1)
×
UNCOV
77
        elif isinstance(left_to_go, list):
×
78
            # Supports legacy structure
79
            for item in left_to_go:
×
80
                tree.append(f'{"-" * level}{last_entry}{item}')
×
UNCOV
81
        elif left_to_go is None:
×
UNCOV
82
            pass
UNCOV
83
        return
×
84

UNCOV
85
    if reload:
×
UNCOV
86
        categories = inventree_api.get_categories()
×
UNCOV
87
        category_data.update({'CATEGORIES': categories})
×
UNCOV
88
        config_interface.dump_file(category_data, settings.CONFIG_CATEGORIES)
×
89
    else:
UNCOV
90
        categories = category_data.get('CATEGORIES', {})
×
91

92
    # Get specified branch
UNCOV
93
    if category:
×
UNCOV
94
        categories = {category: categories.get(category, {})}
×
95

UNCOV
96
    inventree_categories = []
×
97
    # Build category tree
UNCOV
98
    build_tree(inventree_categories, categories, 0)
×
99

UNCOV
100
    return inventree_categories
×
101

102

103
def build_stock_location_tree(reload=False, location=None) -> dict:
4✔
104
    '''Build InvenTree stock locations tree from database data'''
105

106
    locations_data = config_interface.load_file(settings.CONFIG_STOCK_LOCATIONS)
×
107

108
    def build_tree(tree, left_to_go, level) -> list:
×
109
        try:
×
110
            last_entry = f' {category_tree(tree[-1])}{category_separator}'
×
111
        except IndexError:
×
112
            last_entry = ''
×
113
        if isinstance(left_to_go, dict):
×
114
            for key, value in left_to_go.items():
×
115
                tree.append(f'{"-" * level}{last_entry}{key}')
×
116
                build_tree(tree, value, level + 1)
×
117
        elif isinstance(left_to_go, list):
×
118
            # Supports legacy structure
119
            for item in left_to_go:
×
120
                tree.append(f'{"-" * level}{last_entry}{item}')
×
121
        elif left_to_go is None:
×
122
            pass
123
        return
×
124

125
    if reload:
×
126
        stock_locations = inventree_api.get_stock_locations()
×
127
        locations_data.update({'STOCK_LOCATIONS': stock_locations})
×
128
        config_interface.dump_file(locations_data, settings.CONFIG_STOCK_LOCATIONS)
×
129
    else:
130
        stock_locations = locations_data.get('STOCK_LOCATIONS', {})
×
131

132
    # Get specified branch
133
    if location:
×
134
        stock_locations = {location: stock_locations.get(location, {})}
×
135

136
    inventree_stock_locations = []
×
137
    # Build category tree
138
    build_tree(inventree_stock_locations, stock_locations, 0)
×
139

140
    return inventree_stock_locations
×
141

142

143
def get_categories_from_supplier_data(part_info: dict, supplier_only=False) -> list:
4✔
144
    ''' Find categories from part supplier data, use "somewhat automatic" matching '''
145
    from thefuzz import fuzz
4✔
146
    
147
    categories = [None, None]
4✔
148

149
    try:
4✔
150
        supplier_category = str(part_info['category_tree'][0])
4✔
151
        supplier_subcategory = str(part_info['category_tree'][1])
4✔
152
    except KeyError:
×
153
        return categories
×
154

155
    # Return supplier category, if match not needed
156
    if supplier_only:
4✔
157
        categories[0] = supplier_category
×
158
        categories[1] = supplier_subcategory
×
159
        return categories
×
160

161
    function_filter = False
4✔
162
    # TODO: Make 'filter_parameter' user defined?
163
    filter_parameter = 'Function Type'
4✔
164

165
    # Check existing matches
166
    # Load inversed category map
167
    category_map = config_interface.load_supplier_categories_inversed(supplier_config_path=settings.CONFIG_DIGIKEY_CATEGORIES)
4✔
168

169
    try:
4✔
170
        for inventree_category in category_map.keys():
4✔
171
            for key, inventree_subcategory in category_map[inventree_category].items():
4✔
172
                if supplier_subcategory == key:
4✔
173
                    categories[0] = inventree_category
4✔
174
                    # Check if filtering by function
175
                    if inventree_subcategory.startswith(config_interface.FUNCTION_FILTER_KEY):
4✔
176
                        function_filter = True
×
177

178
                    # Save subcategory if not function filtered
179
                    if not function_filter:
4✔
180
                        categories[1] = inventree_subcategory
4✔
181

182
                    break
4✔
183
    except:
×
184
        pass
×
185

186
    # Function Filter
187
    if not categories[1] and function_filter:
4✔
188
        cprint(f'[INFO]\tSubcategory is filtered using "{filter_parameter}" parameter', silent=settings.SILENT, end='')
×
189
        # Load parameter map
190
        parameter_map = config_interface.load_category_parameters(categories, settings.CONFIG_SUPPLIER_PARAMETERS)
×
191
        # Build compare list
192
        compare = []
×
193
        for supplier_parameter, inventree_parameter in parameter_map.items():
×
194
            if (supplier_parameter in part_info['parameters'].keys() and inventree_parameter == filter_parameter):
×
195
                compare.append(part_info['parameters'][supplier_parameter])
×
196

197
        # Load subcategory map
198
        category_map = config_interface.load_supplier_categories(supplier_config_path=settings.CONFIG_DIGIKEY_CATEGORIES)[categories[0]]
×
199
        for inventree_subcategory in category_map.keys():
×
200
            for item in compare:
×
201
                fuzzy_match = fuzz.partial_ratio(inventree_subcategory, item)
×
202
                display_result = f'"{inventree_subcategory}" ?= "{item}"'.ljust(50)
×
203
                cprint(f'{display_result} => {fuzzy_match}', silent=settings.HIDE_DEBUG)
×
204
                if fuzzy_match >= settings.CATEGORY_MATCH_RATIO_LIMIT:
×
205
                    categories[1] = inventree_subcategory.replace(config_interface.FUNCTION_FILTER_KEY, '')
×
206
                    break
×
207

208
            if categories[1]:
×
209
                cprint('\t[ PASS ]', silent=settings.SILENT)
×
210
                break
×
211

212
    if not categories[1] and function_filter:
4✔
213
        cprint('\t[ FAILED ]', silent=settings.SILENT)
×
214

215
    # Automatic Match
216
    if not (categories[0] and categories[1]):
4✔
217
        # Load category map
218
        category_map = config_interface.load_supplier_categories(supplier_config_path=settings.CONFIG_DIGIKEY_CATEGORIES)
4✔
219

220
        def find_supplier_category_match(supplier_category: str, ignore_categories=False):
4✔
221
            # Check for match with Inventree categories
222
            category_match = None
4✔
223
            subcategory_match = None
4✔
224

225
            for inventree_category in category_map.keys():
4✔
226
                fuzzy_match = 0
4✔
227
                
228
                if not ignore_categories:
4✔
229
                    fuzzy_match = fuzz.partial_ratio(supplier_category, inventree_category)
4✔
230
                    display_result = f'"{supplier_category}" ?= "{inventree_category}"'.ljust(50)
4✔
231
                    cprint(f'{display_result} => {fuzzy_match}', silent=settings.HIDE_DEBUG)
4✔
232

233
                if fuzzy_match < settings.CATEGORY_MATCH_RATIO_LIMIT and category_map[inventree_category]:
4✔
234
                    # Compare to subcategories
235
                    for inventree_subcategory in category_map[inventree_category]:
4✔
236
                        fuzzy_match = fuzz.partial_ratio(supplier_category, inventree_subcategory)
4✔
237
                        display_result = f'"{supplier_category}" ?= "{inventree_subcategory}"'.ljust(50)
4✔
238
                        cprint(f'{display_result} => {fuzzy_match}', silent=settings.HIDE_DEBUG)
4✔
239

240
                        if fuzzy_match >= settings.CATEGORY_MATCH_RATIO_LIMIT:
4✔
241
                            subcategory_match = inventree_subcategory
4✔
242
                            break
4✔
243

244
                if fuzzy_match >= settings.CATEGORY_MATCH_RATIO_LIMIT:
4✔
245
                    category_match = inventree_category
4✔
246
                    break
4✔
247

248
            return category_match, subcategory_match
4✔
249

250
        # Find category and subcategories match
251
        category, subcategory = find_supplier_category_match(supplier_category)
4✔
252
        if category:
4✔
253
            categories[0] = category
4✔
254
        if subcategory:
4✔
255
            categories[1] = subcategory
×
256

257
        # Run match with supplier subcategory
258
        if not categories[0] or not categories[1]:
4✔
259
            if categories[0]:
4✔
260
                # If category was found: ignore them for the comparison
261
                category, subcategory = find_supplier_category_match(supplier_subcategory, ignore_categories=True)
4✔
262
            else:
263
                category, subcategory = find_supplier_category_match(supplier_subcategory)
4✔
264

265
        if category and not categories[0]:
4✔
266
            categories[0] = category
4✔
267
        if subcategory and not categories[1]:
4✔
268
            categories[1] = subcategory
4✔
269

270
    # Final checks
271
    if not categories[0]:
4✔
272
        cprint(f'[INFO]\tWarning: "{part_info["category_tree"][0]}" did not match any supplier category ', silent=settings.SILENT)
×
273
    else:
274
        cprint(f'[INFO]\tCategory: "{categories[0]}"', silent=settings.SILENT)
4✔
275
    if not categories[1]:
4✔
276
        cprint(f'[INFO]\tWarning: "{part_info["category_tree"][1]}" did not match any supplier subcategory ', silent=settings.SILENT)
4✔
277
    else:
278
        cprint(f'[INFO]\tSubcategory: "{categories[1]}"', silent=settings.SILENT)
4✔
279
    
280
    # print(f'{supplier_category=} | {supplier_subcategory=} | {categories[0]=} | {categories[1]=}')
281
    return categories
4✔
282

283

284
def translate_form_to_inventree(part_info: dict, category_tree: list, is_custom=False) -> dict:
4✔
285
    ''' Using supplier part data and categories, fill-in InvenTree part dictionary '''
286

287
    # Copy template
UNCOV
288
    inventree_part = copy.deepcopy(settings.inventree_part_template)
×
289

290
    # Translate form data to inventree part
UNCOV
291
    inventree_part['category_tree'] = category_tree
×
UNCOV
292
    inventree_part['name'] = part_info['name']
×
UNCOV
293
    inventree_part['description'] = part_info['description']
×
UNCOV
294
    inventree_part['revision'] = part_info['revision']
×
UNCOV
295
    inventree_part['keywords'] = part_info['keywords']
×
UNCOV
296
    inventree_part['supplier_name'] = part_info['supplier_name']
×
UNCOV
297
    inventree_part['supplier_part_number'] = part_info['supplier_part_number']
×
UNCOV
298
    inventree_part['manufacturer_name'] = part_info['manufacturer_name']
×
UNCOV
299
    inventree_part['manufacturer_part_number'] = part_info['manufacturer_part_number']
×
UNCOV
300
    inventree_part['IPN'] = part_info.get('IPN', '')
×
301
    # Replace whitespaces in URL
UNCOV
302
    inventree_part['supplier_link'] = part_info['supplier_link'].replace(' ', '%20')
×
UNCOV
303
    inventree_part['datasheet'] = part_info['datasheet'].replace(' ', '%20')
×
304
    # Image URL is not shown to user so force default key/value
UNCOV
305
    try:
×
UNCOV
306
        inventree_part['image'] = part_info['image'].replace(' ', '%20')
×
UNCOV
307
    except AttributeError:
×
308
        # Part image URL is null (no product picture)
UNCOV
309
        pass
×
UNCOV
310
    inventree_part['pricing'] = part_info.get('pricing', {})
×
UNCOV
311
    inventree_part['currency'] = part_info.get('currency', 'USD')
×
312

UNCOV
313
    parameters = part_info.get('parameters', {})
×
314

315
    # Load parameters map
UNCOV
316
    if category_tree:
×
UNCOV
317
        parameter_map = config_interface.load_category_parameters(
×
318
            categories=category_tree,
319
            supplier_config_path=settings.CONFIG_SUPPLIER_PARAMETERS,
320
        )
321
    else:
322
        cprint('[INFO]\tWarning: Parameter map not loaded (no category selected)', silent=settings.SILENT)
×
323

UNCOV
324
    if not is_custom:
×
325
        # Add Parameters
UNCOV
326
        if parameter_map:
×
UNCOV
327
            parameters_missing = []
×
UNCOV
328
            for supplier_param, inventree_param in parameter_map.items():
×
329
                # Some parameters may not be mapped
UNCOV
330
                if inventree_param not in inventree_part['parameters'].keys():
×
UNCOV
331
                    if supplier_param == 'Manufacturer Part Number':
×
UNCOV
332
                        inventree_part['parameters'][inventree_param] = part_info['manufacturer_part_number']
×
UNCOV
333
                    elif inventree_param == 'image':
×
334
                        inventree_part['existing_image'] = supplier_param
×
335
                    else:
UNCOV
336
                        try:
×
UNCOV
337
                            parameter_value = part_tools.clean_parameter_value(
×
338
                                category=category_tree[0],
339
                                name=supplier_param,
340
                                value=parameters[supplier_param],
341
                            )
UNCOV
342
                            inventree_part['parameters'][inventree_param] = parameter_value
×
UNCOV
343
                        except KeyError:
×
UNCOV
344
                            parameters_missing.append(supplier_param)
×
UNCOV
345
            if parameters_missing:
×
UNCOV
346
                msg = '[INFO]\tWarning: The following parameters were not found in supplier data:\n'
×
UNCOV
347
                msg += str(parameters_missing)
×
UNCOV
348
                cprint(msg, silent=settings.SILENT)
×
349

350
            # Check for missing InvenTree parameters and fill value with dash
UNCOV
351
            for inventree_param in parameter_map.values():
×
UNCOV
352
                if inventree_param == 'image':
×
353
                    continue
×
UNCOV
354
                if inventree_param not in inventree_part['parameters'].keys():
×
UNCOV
355
                    inventree_part['parameters'][inventree_param] = '-'
×
356

357
            # Check for extra parameters which weren't mapped
UNCOV
358
            parameters_unmapped = []
×
UNCOV
359
            for search_param in parameters.keys():
×
UNCOV
360
                if search_param not in parameter_map.keys():
×
UNCOV
361
                    parameters_unmapped.append(search_param)
×
362
            
UNCOV
363
            if parameters_unmapped:
×
UNCOV
364
                if not settings.SILENT:
×
365
                    msg = f'[INFO]\tThe following parameters are not mapped in {inventree_part["supplier_name"]} parameters configuration:\n'
×
366
                    msg += str(parameters_unmapped)
×
367
                    print(msg)
×
368
        else:
369
            cprint(f'[INFO]\tWarning: Parameter map for "{category_tree[0]}" does not exist or is empty', silent=settings.SILENT)
×
370

UNCOV
371
    return inventree_part
×
372

373

374
def get_supplier_name(supplier: str) -> str:
4✔
375
    ''' Get InvenTree supplier name '''
376

377
    supplier_name = supplier
×
378

379
    for supplier, data in settings.CONFIG_SUPPLIERS.items():
×
380
        if data['name'] == supplier_name:
×
381
            # Update supplier name
382
            supplier_name = supplier
×
383
            break
×
384
    
385
    return supplier_name
×
386

387

388
def translate_supplier_to_form(supplier: str, part_info: dict) -> dict:
4✔
389
    ''' Translate supplier data to user form format '''
390

391
    part_form = {}
4✔
392

393
    def get_value_from_user_key(user_key: str, default_key: str, default_value=None) -> str:
4✔
394
        ''' Get value mapped from user search key, else default search key '''
395
        user_search_key = None
4✔
396
        if supplier == 'Digi-Key':
4✔
397
            user_search_key = settings.CONFIG_DIGIKEY.get(user_key, None)
4✔
398
        elif supplier == 'Mouser':
×
399
            user_search_key = settings.CONFIG_MOUSER.get(user_key, None)
×
400
        elif supplier in ['Farnell', 'Newark', 'Element14']:
×
401
            user_search_key = settings.CONFIG_ELEMENT14.get(user_key, None)
×
402
        elif supplier == 'LCSC':
×
403
            user_search_key = settings.CONFIG_LCSC.get(user_key, None)
×
404
        elif supplier == 'Jameco':
×
405
            user_search_key = settings.CONFIG_JAMECO.get(user_key, None)
×
406
        elif supplier == 'TME':
×
407
            user_search_key = settings.CONFIG_TME.get(user_key, None)
×
408
        elif supplier == 'AutomationDirect':
×
409
            user_search_key = settings.CONFIG_AUTOMATIONDIRECT.get(user_key, None)
×
410

411
        else:
412
            return default_value
×
413
        
414
        # If no user key, use default
415
        if not user_search_key:
4✔
416
            return part_info.get(default_key, default_value)
4✔
417

418
        # Get value for user key, return value from default key if not found
419
        return part_info.get(user_search_key, part_info.get(default_key, default_value))
×
420

421
    # Check that supplier argument is valid
422
    if not supplier and supplier != 'custom':
4✔
423
        return part_form
×
424
    # Get default keys
425
    if supplier == 'Digi-Key':
4✔
426
        default_search_keys = digikey_api.get_default_search_keys()
4✔
427
    elif supplier == 'Mouser':
×
428
        default_search_keys = mouser_api.get_default_search_keys()
×
429
    elif supplier in ['Farnell', 'Newark', 'Element14']:
×
430
        default_search_keys = element14_api.get_default_search_keys()
×
431
    elif supplier == 'LCSC':
×
432
        default_search_keys = lcsc_api.get_default_search_keys()
×
433
    elif supplier == 'Jameco':
×
434
        default_search_keys = jameco_api.get_default_search_keys()
×
435
    elif supplier == 'TME':
×
436
        default_search_keys = tme_api.get_default_search_keys()
×
437
    elif supplier == 'AutomationDirect':
×
438
        default_search_keys = automationdirect_api.get_default_search_keys()
×
439
    else:
440
        # Empty array of default search keys
441
        default_search_keys = [''] * len(digikey_api.get_default_search_keys())
×
442

443
    # Default revision
444
    revision = settings.CONFIG_IPN.get('INVENTREE_DEFAULT_REV', '')
4✔
445
    # Translate supplier data to form fields
446
    part_form['name'] = get_value_from_user_key('SEARCH_NAME', default_search_keys[0], default_value='')
4✔
447
    part_form['description'] = get_value_from_user_key('SEARCH_DESCRIPTION', default_search_keys[1], default_value='')
4✔
448
    part_form['revision'] = get_value_from_user_key('SEARCH_REVISION', default_search_keys[2], default_value=revision)
4✔
449
    part_form['keywords'] = get_value_from_user_key('SEARCH_KEYWORDS', default_search_keys[3], default_value='')
4✔
450
    part_form['supplier_name'] = settings.CONFIG_SUPPLIERS[supplier]['name']
4✔
451
    part_form['supplier_part_number'] = get_value_from_user_key('SEARCH_SKU', default_search_keys[4], default_value='')
4✔
452
    part_form['supplier_link'] = get_value_from_user_key('SEARCH_SUPPLIER_URL', default_search_keys[7], default_value='')
4✔
453
    part_form['manufacturer_name'] = get_value_from_user_key('SEARCH_MANUFACTURER', default_search_keys[5], default_value='')
4✔
454
    part_form['manufacturer_part_number'] = get_value_from_user_key('SEARCH_MPN', default_search_keys[6], default_value='')
4✔
455
    part_form['datasheet'] = get_value_from_user_key('SEARCH_DATASHEET', default_search_keys[8], default_value='')
4✔
456
    part_form['image'] = get_value_from_user_key('', default_search_keys[9], default_value='')
4✔
457
    
458
    return part_form
4✔
459

460

461
def supplier_search(supplier: str, part_number: str, test_mode=False) -> dict:
4✔
462
    ''' Wrapper for supplier search, allow use of cached data (limited daily API calls) '''
463
    part_info = {}
4✔
464
    # Check part number exist
465
    if not part_number:
4✔
UNCOV
466
        cprint('\n[MAIN]\tError: Missing Part Number', silent=settings.SILENT)
×
UNCOV
467
        return part_info
×
468

469
    store = ''
4✔
470
    if supplier in ['Farnell', 'Newark', 'Element14']:
4✔
471
        try:
×
472
            element14_config = config_interface.load_file(settings.CONFIG_ELEMENT14_API)
×
473
            store = element14_config.get(f'{supplier.upper()}_STORE', '').replace(' ', '')
×
474
        except AttributeError:
×
475
            cprint(f'\n[INFO]\tWarning: {supplier.upper()}_STORE value not found', silent=False)
×
476

477
    search_filename = f"{settings.search_results['directory']}{supplier}{store}_{part_number}{settings.search_results['extension']}"
4✔
478
    # Get cached data, if cache is enabled (else returns None)
479
    part_cache = search_api.load_from_file(search_filename, test_mode)
4✔
480

481
    if part_cache:
4✔
482
        cprint(f'\n[MAIN]\tUsing {supplier} cached data for {part_number}', silent=settings.SILENT)
4✔
483
        part_info = part_cache
4✔
484
    else:
485
        cprint(f'\n[MAIN]\t{supplier} search for {part_number}', silent=settings.SILENT)
×
486
        if supplier == 'Digi-Key':
×
487
            part_info = digikey_api.fetch_part_info(part_number)
×
488
        elif supplier == 'Mouser':
×
489
            part_info = mouser_api.fetch_part_info(part_number)
×
490
        elif supplier in ['Farnell', 'Newark', 'Element14']:
×
491
            part_info = element14_api.fetch_part_info(part_number, supplier)
×
492
        elif supplier == 'LCSC':
×
493
            part_info = lcsc_api.fetch_part_info(part_number)
×
494
        elif supplier == 'Jameco':
×
495
            part_info = jameco_api.fetch_part_info(part_number)
×
496
        elif supplier == 'TME':
×
497
            part_info = tme_api.fetch_part_info(part_number)
×
498
        elif supplier == 'AutomationDirect':
×
499
            part_info = automationdirect_api.fetch_part_info(part_number)
×
500

501
    # Check supplier data exist
502
    if not part_info:
4✔
503
        cprint(f'[INFO]\tError: Failed to fetch data for "{part_number}"', silent=settings.SILENT)
×
504

505
    # Save search results
506
    if part_info:
4✔
507
        update_ts = not bool(part_cache) or test_mode
4✔
508
        search_api.save_to_file(part_info, search_filename, update_ts=update_ts)
4✔
509

510
    return part_info
4✔
511

512

513
def inventree_fuzzy_company_match(name: str) -> str:
4✔
514
    ''' Fuzzy match company name to exisiting companies '''
UNCOV
515
    from thefuzz import fuzz
×
516
    
UNCOV
517
    inventree_companies = inventree_api.get_all_companies()
×
518

UNCOV
519
    for company_name in inventree_companies.keys():
×
UNCOV
520
        cprint(f'{name.lower()} == {company_name.lower()} % {fuzz.partial_ratio(name.lower(), company_name.lower())}',
×
521
               silent=settings.HIDE_DEBUG)
UNCOV
522
        if fuzz.partial_ratio(name.lower(), company_name.lower()) == 100 and len(name) == len(company_name):
×
UNCOV
523
            return company_name
×
524
    
UNCOV
525
    return name
×
526

527

528
def inventree_create_manufacturer_part(part_id: int, manufacturer_name: str, manufacturer_mpn: str, datasheet: str, description: str) -> bool:
4✔
529
    ''' Create manufacturer part '''
530

UNCOV
531
    cprint('\n[MAIN]\tCreating manufacturer part', silent=settings.SILENT)
×
UNCOV
532
    manufacturer_part = inventree_api.is_new_manufacturer_part(manufacturer_name=manufacturer_name,
×
533
                                                               manufacturer_mpn=manufacturer_mpn)
534

UNCOV
535
    if manufacturer_part:
×
UNCOV
536
        cprint('[INFO]\tManufacturer part already exists, skipping.', silent=settings.SILENT)
×
537
    else:
538
        # Create a new manufacturer part
UNCOV
539
        is_manufacturer_part_created = inventree_api.create_manufacturer_part(part_id=part_id,
×
540
                                                                              manufacturer_name=manufacturer_name,
541
                                                                              manufacturer_mpn=manufacturer_mpn,
542
                                                                              datasheet=datasheet,
543
                                                                              description=description)
544

UNCOV
545
        if is_manufacturer_part_created:
×
UNCOV
546
            cprint('[INFO]\tSuccess: Added new manufacturer part', silent=settings.SILENT)
×
UNCOV
547
            return True
×
548

UNCOV
549
    return False
×
550

551

552
def inventree_create_supplier_part(part) -> bool:
4✔
553
    return
×
554

555

556
def get_inventree_stock_location_id(stock_location_tree: list):
4✔
557
    return inventree_api.get_inventree_stock_location_id(stock_location_tree)
×
558

559

560
def inventree_create(part_info: dict, stock=None, kicad=False, symbol=None, footprint=None, show_progress=True, is_custom=False, enable_upload=True):
4✔
561
    ''' Create InvenTree part from supplier part data and categories '''
562

UNCOV
563
    part_pk = 0
×
UNCOV
564
    new_part = False
×
565

UNCOV
566
    category_tree = part_info['category_tree']
×
UNCOV
567
    if not category_tree:
×
568
        cprint(f'[INFO]\tError: Category tree is empty {category_tree=}', silent=settings.SILENT)
×
569
        return new_part, part_pk, {}
×
570

571
    # Translate to InvenTree part format
UNCOV
572
    inventree_part = translate_form_to_inventree(
×
573
        part_info=part_info,
574
        category_tree=category_tree,
575
        is_custom=is_custom,
576
    )
577

UNCOV
578
    if not inventree_part:
×
579
        cprint('\n[MAIN]\tError: Failed to process form data', silent=settings.SILENT)
×
580

UNCOV
581
    category_pk = inventree_api.get_inventree_category_id(category_tree)
×
UNCOV
582
    if category_pk <= 0:
×
583
        cprint(f'[ERROR]\tCategory ({category_tree}) does not exist in InvenTree', silent=settings.SILENT)
×
584
    else:
UNCOV
585
        if settings.CHECK_EXISTING:
×
586
            # Check if part already exists
UNCOV
587
            part_pk = inventree_api.is_new_part(category_pk, inventree_part)
×
588
            # Part exists
UNCOV
589
            if part_pk > 0:
×
UNCOV
590
                cprint('[INFO]\tPart already exists, skipping.', silent=settings.SILENT)
×
UNCOV
591
                info = inventree_api.get_part_info(part_pk)
×
UNCOV
592
                if info:
×
593
                    # Update InvenTree part number
UNCOV
594
                    inventree_part = {**inventree_part, **info}
×
595
                    # Update InvenTree URL
UNCOV
596
                    inventree_part['inventree_url'] = f'{settings.PART_URL_ROOT}{inventree_part["IPN"]}/'
×
597
                else:
598
                    inventree_part['inventree_url'] = f'{settings.PART_URL_ROOT}{part_pk}/'
×
599
        # Part is new
UNCOV
600
        if not part_pk:
×
UNCOV
601
            new_part = True
×
UNCOV
602
            if settings.CONFIG_IPN.get('IPN_ENABLE_CREATE', True):
×
603
                # Generate Placeholder Internal Part Number
UNCOV
604
                ipn = part_tools.generate_part_number(
×
605
                    category=category_tree[0],
606
                    part_pk=0,
607
                    category_code=part_info.get('category_code', ''),
608
                )
609
            else:
610
                ipn = ''
×
611
            # Create a new Part
612
            # Use the pk (primary-key) of the category
UNCOV
613
            part_pk = inventree_api.create_part(
×
614
                category_id=category_pk,
615
                name=inventree_part['name'],
616
                description=inventree_part['description'],
617
                revision=inventree_part['revision'],
618
                keywords=inventree_part['keywords'],
619
                ipn=ipn)
620

621
            # Check part primary key
UNCOV
622
            if not part_pk:
×
623
                return new_part, part_pk, inventree_part
×
624
            # Progress Update
UNCOV
625
            if not progress.update_progress_bar(show_progress):
×
626
                return new_part, part_pk, inventree_part
×
627

UNCOV
628
            if settings.CONFIG_IPN.get('IPN_ENABLE_CREATE', True):
×
629
                # Generate Internal Part Number
UNCOV
630
                cprint('\n[MAIN]\tGenerating Internal Part Number', silent=settings.SILENT)
×
UNCOV
631
                if settings.CONFIG_IPN.get('IPN_USE_MANUFACTURER_PART_NUMBER', False):
×
632
                    ipn = inventree_part['manufacturer_part_number']
×
633
                else:
UNCOV
634
                    ipn = part_tools.generate_part_number(
×
635
                        category=category_tree[0],
636
                        part_pk=part_pk,
637
                        category_code=part_info.get('category_code', ''),
638
                    )
UNCOV
639
                cprint(f'[INFO]\tInternal Part Number = {ipn}', silent=settings.SILENT)
×
640
                # Update InvenTree part number
UNCOV
641
                ipn_update = inventree_api.set_part_number(part_pk, ipn)
×
UNCOV
642
                if not ipn_update:
×
643
                    cprint('\n[INFO]\tError updating IPN', silent=settings.SILENT)
×
UNCOV
644
                inventree_part['IPN'] = ipn
×
645
                # Update InvenTree URL
UNCOV
646
                inventree_part['inventree_url'] = f'{settings.PART_URL_ROOT}{inventree_part["IPN"]}/'
×
647
            else:
648
                inventree_part['inventree_url'] = f'{settings.PART_URL_ROOT}{part_pk}/'
×
649

650
    # Progress Update
UNCOV
651
    if not progress.update_progress_bar(show_progress):
×
652
        return new_part, part_pk, inventree_part
×
653

UNCOV
654
    if part_pk > 0:
×
UNCOV
655
        if new_part:
×
UNCOV
656
            cprint('[INFO]\tSuccess: Added new part to InvenTree', silent=settings.SILENT)
×
UNCOV
657
            if inventree_part.get('existing_image', ''):
×
658
                inventree_api.update_part(
×
659
                    part_pk,
660
                    data={'existing_image': inventree_part['existing_image']})
UNCOV
661
            elif inventree_part['image']:
×
UNCOV
662
                if enable_upload:
×
663
                    # Add image
UNCOV
664
                    image_result = inventree_api.upload_part_image(inventree_part['image'], part_pk, silent=settings.SILENT)
×
UNCOV
665
                    if not image_result:
×
666
                        cprint('[TREE]\tWarning: Failed to upload part image', silent=settings.SILENT)
×
UNCOV
667
        if inventree_part['datasheet'] and settings.DATASHEET_UPLOAD:
×
UNCOV
668
            if enable_upload:
×
669
                # Upload datasheet
UNCOV
670
                datasheet_link = inventree_api.upload_part_datasheet(
×
671
                    datasheet_url=inventree_part['datasheet'],
672
                    part_ipn=inventree_part['IPN'],
673
                    part_pk=part_pk,
674
                    silent=settings.SILENT,
675
                )
UNCOV
676
                if not datasheet_link:
×
677
                    cprint('[TREE]\tWarning: Failed to upload part datasheet', silent=settings.SILENT)
×
678
                else:
UNCOV
679
                    cprint('[TREE]\tSuccess: Uploaded part datasheet', silent=settings.SILENT)
×
680

UNCOV
681
        if kicad:
×
UNCOV
682
            try:
×
UNCOV
683
                symbol_name = ipn
×
684
            except UnboundLocalError:
×
685
                symbol_name = inventree_part.get('manufacturer_part_number')
×
686

687
            # Create symbol & footprint parameters
UNCOV
688
            if symbol:
×
UNCOV
689
                symbol = f'{symbol.split(":")[0]}:{symbol_name}'
×
UNCOV
690
                inventree_part['parameters']['Symbol'] = symbol
×
UNCOV
691
            if footprint:
×
692
                inventree_part['parameters']['Footprint'] = footprint
×
693

UNCOV
694
        if not inventree_part['parameters']:
×
695
            category_parameters = inventree_api.get_category_parameters(category_pk)
×
696

697
            # Add category-defined parameters
698
            for parameter in category_parameters:
×
699
                inventree_part['parameters'][parameter[0]] = parameter[1]
×
700

701
        # Create parameters
UNCOV
702
        if len(inventree_part['parameters']) > 0:
×
UNCOV
703
            if not inventree_process_parameters(
×
704
                    part_id=part_pk,
705
                    parameters=inventree_part['parameters'],
706
                    show_progress=show_progress):
707
                return new_part, part_pk, inventree_part
×
708
            
709
        # Create manufacturer part
UNCOV
710
        if inventree_part['manufacturer_name'] and inventree_part['manufacturer_part_number']:
×
711
            # Overwrite manufacturer name with matching one from database
UNCOV
712
            manufacturer_name = inventree_fuzzy_company_match(inventree_part['manufacturer_name'])
×
713
            # Get MPN
UNCOV
714
            manufacturer_mpn = inventree_part['manufacturer_part_number']
×
715

UNCOV
716
            cprint('\n[MAIN]\tCreating manufacturer part', silent=settings.SILENT)
×
UNCOV
717
            manufacturer_part = inventree_api.is_new_manufacturer_part(
×
718
                manufacturer_name=manufacturer_name,
719
                manufacturer_mpn=manufacturer_mpn,
720
            )
721

UNCOV
722
            if manufacturer_part:
×
723
                cprint('[INFO]\tManufacturer part already exists, skipping.', silent=settings.SILENT)
×
724
            else:
725
                # Create a new manufacturer part
UNCOV
726
                is_manufacturer_part_created = inventree_api.create_manufacturer_part(
×
727
                    part_id=part_pk,
728
                    manufacturer_name=manufacturer_name,
729
                    manufacturer_mpn=manufacturer_mpn,
730
                    datasheet=inventree_part['datasheet'],
731
                    description=inventree_part['description'],
732
                )
733

UNCOV
734
                if is_manufacturer_part_created:
×
UNCOV
735
                    cprint('[INFO]\tSuccess: Added new manufacturer part', silent=settings.SILENT)
×
736

737
        # Create supplier part
UNCOV
738
        if inventree_part['supplier_name'] and inventree_part['supplier_part_number']:
×
739
            # Overwrite manufacturer name with matching one from database
UNCOV
740
            supplier_name = inventree_fuzzy_company_match(inventree_part['supplier_name'])
×
741
            # Get SKU
UNCOV
742
            supplier_sku = inventree_part['supplier_part_number']
×
743

UNCOV
744
            cprint('\n[MAIN]\tCreating supplier part', silent=settings.SILENT)
×
UNCOV
745
            is_new_supplier_part, supplier_part = inventree_api.is_new_supplier_part(
×
746
                supplier_name=supplier_name,
747
                supplier_sku=supplier_sku)
748

UNCOV
749
            if not is_new_supplier_part:
×
750
                cprint('[INFO]\tSupplier part already exists, skipping.', silent=settings.SILENT)
×
751
            else:
752
                # Create a new supplier part
UNCOV
753
                is_supplier_part_created, supplier_part = inventree_api.create_supplier_part(
×
754
                    part_id=part_pk,
755
                    manufacturer_name=manufacturer_name,
756
                    manufacturer_mpn=manufacturer_mpn,
757
                    supplier_name=supplier_name,
758
                    supplier_sku=supplier_sku,
759
                    description=inventree_part['description'],
760
                    link=inventree_part['supplier_link'],
761
                )
762

UNCOV
763
                if is_supplier_part_created:
×
UNCOV
764
                    cprint('[INFO]\tSuccess: Added new supplier part', silent=settings.SILENT)
×
765
            
UNCOV
766
            if supplier_part and settings.PRICING_UPLOAD:
×
767
                cprint('\n[MAIN]\tProcessing Price Breaks', silent=settings.SILENT)
×
768
                inventree_api.update_price_breaks(
×
769
                    supplier_part=supplier_part,
770
                    price_breaks=inventree_part['pricing'],
771
                    currency=inventree_part['currency'])
772

UNCOV
773
        if stock is not None:
×
774
            stock['part'] = part_pk
×
775
            inventree_api.create_stock(stock)
×
776
            if stock['make_default']:
×
777
                inventree_api.set_part_default_location(part_pk, stock['location'])
×
778

779
    # Progress Update
UNCOV
780
    if not progress.update_progress_bar(show_progress):
×
781
        pass
782

UNCOV
783
    return new_part, part_pk, inventree_part
×
784

785

786
def inventree_process_parameters(part_id: str, parameters: dict, show_progress=True) -> bool:
4✔
787
    ''' Create or Update parameters for an InvenTree part'''
UNCOV
788
    cprint('\n[MAIN]\tCreating parameters', silent=settings.SILENT)
×
UNCOV
789
    parameters_lists = [
×
790
        [],  # Store new parameters
791
        [],  # Store updated parameters
792
        [],  # Store unchanged parameters
793
    ]
UNCOV
794
    for name, value in parameters.items():
×
UNCOV
795
        parameter, is_new_parameter, was_updated = inventree_api.create_parameter(part_id=part_id, template_name=name, value=value)
×
796
        # Progress Update
UNCOV
797
        if not progress.update_progress_bar(show_progress, increment=0.03):
×
798
            return False
×
UNCOV
799
        if is_new_parameter:
×
UNCOV
800
            parameters_lists[0].append(name)
×
UNCOV
801
        elif was_updated:
×
802
            parameters_lists[1].append(name)
×
803
        else:
UNCOV
804
            parameters_lists[2].append(name)
×
UNCOV
805
    if parameters_lists[0]:
×
UNCOV
806
        cprint('[INFO]\tSuccess: The following parameters were created:', silent=settings.SILENT)
×
UNCOV
807
        for item in parameters_lists[0]:
×
UNCOV
808
            cprint(f'--->\t{item}', silent=settings.SILENT)
×
UNCOV
809
    if parameters_lists[1]:
×
810
        cprint('[INFO]\tSuccess: The following parameters were updated:', silent=settings.SILENT)
×
811
        for item in parameters_lists[1]:
×
812
            cprint(f'--->\t{item}', silent=settings.SILENT)
×
UNCOV
813
    if parameters_lists[2]:
×
UNCOV
814
        cprint('[TREE]\tWarning: The following parameters were skipped:', silent=settings.SILENT)
×
UNCOV
815
        for item in parameters_lists[2]:
×
UNCOV
816
            cprint(f'--->\t{item}', silent=settings.SILENT)
×
UNCOV
817
    return True
×
818

819

820
def inventree_create_alternate(part_info: dict, part_id='', part_ipn='', show_progress=None) -> bool:
4✔
821
    ''' Create alternate manufacturer and supplier entries for an existing InvenTree part '''
822

UNCOV
823
    result = False
×
UNCOV
824
    cprint('\n[MAIN]\tSearching for original part in database', silent=settings.SILENT)
×
UNCOV
825
    part = inventree_api.fetch_part(part_id, part_ipn)
×
826

UNCOV
827
    if part:
×
UNCOV
828
        part_pk = part.pk
×
UNCOV
829
        part_description = part.description
×
UNCOV
830
        cprint(f'[INFO] Success: Found original part in database (ID = {part_pk} | Description = "{part_description}")', silent=settings.SILENT)
×
831
    else:
832
        cprint('[INFO] Error: Original part was not found in database', silent=settings.SILENT)
×
833
        return result
×
834
    # Translate to InvenTree part format
UNCOV
835
    category_tree = inventree_api.get_category_tree(part.category)
×
UNCOV
836
    category_tree = list(category_tree.values())
×
UNCOV
837
    category_tree.reverse()
×
UNCOV
838
    inventree_part = translate_form_to_inventree(
×
839
        part_info=part_info,
840
        category_tree=category_tree,
841
    )
842

843
    # If the part has no image yet try to upload it from the data
UNCOV
844
    if not part.image:
×
UNCOV
845
        image = part_info.get('image', '')
×
UNCOV
846
        existing_image = inventree_part.get('existing_image', '')
×
UNCOV
847
        if existing_image:
×
848
            inventree_api.update_part(pk=part_pk,
×
849
                                      data={'existing_image': existing_image})
UNCOV
850
        elif image:
×
851
            inventree_api.upload_part_image(image_url=image, part_id=part_pk, silent=settings.SILENT)
×
852

853
    # create or update parameters
UNCOV
854
    if inventree_part.get('parameters', {}):
×
UNCOV
855
        inventree_process_parameters(part_id=part_pk,
×
856
                                     parameters=inventree_part['parameters'],
857
                                     show_progress=show_progress)
858

859
    # Overwrite manufacturer name with matching one from database
UNCOV
860
    manufacturer_name = inventree_fuzzy_company_match(part_info.get('manufacturer_name', ''))
×
UNCOV
861
    manufacturer_mpn = part_info.get('manufacturer_part_number', '')
×
UNCOV
862
    datasheet = part_info.get('datasheet', '')
×
863

UNCOV
864
    attachment = part.getAttachments()
×
865
    # if datasheet upload is enabled and no attachment present yet then upload
UNCOV
866
    if settings.DATASHEET_UPLOAD and not attachment:
×
UNCOV
867
        if datasheet:
×
UNCOV
868
            part_info['datasheet'] = inventree_api.upload_part_datasheet(
×
869
                datasheet_url=datasheet,
870
                part_ipn=part_ipn,
871
                part_pk=part_id,
872
                silent=settings.SILENT,
873
            )
UNCOV
874
            if not part_info['datasheet']:
×
875
                cprint('[TREE]\tWarning: Failed to upload part datasheet', silent=settings.SILENT)
×
876
            else:
UNCOV
877
                cprint('[TREE]\tSuccess: Uploaded part datasheet', silent=settings.SILENT)
×
878
    # if an attachment is present, set it as the datasheet field
UNCOV
879
    if attachment:
×
UNCOV
880
        part_info['datasheet'] = f'{inventree_api.inventree_api.base_url.strip("/")}{attachment[0]["attachment"]}'
×
881

882
    # Create manufacturer part
UNCOV
883
    if manufacturer_name and manufacturer_mpn:
×
UNCOV
884
        inventree_create_manufacturer_part(part_id=part_pk,
×
885
                                           manufacturer_name=manufacturer_name,
886
                                           manufacturer_mpn=manufacturer_mpn,
887
                                           datasheet=datasheet,
888
                                           description=part_description)
889
    else:
890
        cprint('[INFO]\tWarning: No manufacturer part to create', silent=settings.SILENT)
×
891

892
    # Progress Update
UNCOV
893
    if not progress.update_progress_bar(show_progress, increment=0.2):
×
894
        return
×
895

UNCOV
896
    supplier_name = part_info.get('supplier_name', '')
×
UNCOV
897
    supplier_sku = part_info.get('supplier_part_number', '')
×
UNCOV
898
    supplier_link = part_info.get('supplier_link', '')
×
899

900
    # Add supplier alternate
UNCOV
901
    if supplier_name and supplier_sku:
×
UNCOV
902
        cprint('\n[MAIN]\tCreating supplier part', silent=settings.SILENT)
×
UNCOV
903
        is_new_supplier_part, supplier_part = inventree_api.is_new_supplier_part(
×
904
            supplier_name=supplier_name,
905
            supplier_sku=supplier_sku)
906

UNCOV
907
        if not is_new_supplier_part:
×
UNCOV
908
            cprint('[INFO]\tSupplier part already exists, skipping.', silent=settings.SILENT)
×
909
        else:
910
            # Create a new supplier part
UNCOV
911
            is_supplier_part_created, supplier_part = inventree_api.create_supplier_part(
×
912
                part_id=part_pk,
913
                manufacturer_name=manufacturer_name,
914
                manufacturer_mpn=manufacturer_mpn,
915
                supplier_name=supplier_name,
916
                supplier_sku=supplier_sku,
917
                description=part_description,
918
                link=supplier_link)
919

UNCOV
920
            if is_supplier_part_created:
×
UNCOV
921
                cprint('[INFO]\tSuccess: Added new supplier part', silent=settings.SILENT)
×
UNCOV
922
                result = True
×
923

UNCOV
924
        if supplier_part and settings.PRICING_UPLOAD:
×
925
            cprint('\n[MAIN]\tProcessing Price Breaks', silent=settings.SILENT)
×
926
            inventree_api.update_price_breaks(
×
927
                supplier_part=supplier_part,
928
                price_breaks=inventree_part['pricing'],
929
                currency=inventree_part['currency'])
930
            result = True
×
931
    
932
    else:
933
        cprint('[INFO]\tWarning: No supplier part to create', silent=settings.SILENT)
×
934

UNCOV
935
    return result
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc