• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Chaffelson / nipyapi / #2

25 Mar 2025 10:32AM CUT coverage: 69.867% (+69.9%) from 0.0%
#2

push

coveralls-python

web-flow
V022release (#384)

* Update history for release

* Bump version: 0.21.0 → 0.22.0

1 of 1 new or added line in 1 file covered. (100.0%)

1048 of 1500 relevant lines covered (69.87%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.0
/nipyapi/utils.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
"""
5
Convenience utility functions for NiPyApi, not really intended for external use
6
"""
7

8
from __future__ import absolute_import, unicode_literals
1✔
9
import logging
1✔
10
import json
1✔
11
import io
1✔
12
import time
1✔
13
from copy import copy
1✔
14
from functools import reduce, wraps
1✔
15
import operator
1✔
16
from contextlib import contextmanager
1✔
17
from packaging import version
1✔
18
import six
1✔
19
from ruamel.yaml import YAML
1✔
20
from ruamel.yaml.compat import StringIO
1✔
21
import requests
1✔
22
from requests.models import Response
1✔
23
from future.utils import raise_from as _raise
1✔
24
import nipyapi
1✔
25
from nipyapi.config import default_string_encoding as DEF_ENCODING
1✔
26

27
__all__ = ['dump', 'load', 'fs_read', 'fs_write', 'filter_obj',
28
           'wait_to_complete', 'is_endpoint_up', 'set_endpoint',
29
           'start_docker_containers', 'DockerContainer',
30
           'infer_object_label_from_class', 'bypass_slash_encoding',
31
           'exception_handler', 'enforce_min_ver', 'check_version',
32
           'validate_parameters_versioning_support'
33
           ]
34

35
log = logging.getLogger(__name__)
1✔
36

37
try:
1✔
38
    import docker
1✔
39
    from docker.errors import ImageNotFound
×
40
    DOCKER_AVAILABLE = True
×
41
except ImportError:
42
    DOCKER_AVAILABLE = False
43

44

45
def dump(obj, mode='json'):
1✔
46
    """
47
    Dumps a native datatype object or swagger entity to json or yaml
48
        defaults to json
49

50
    Args:
51
        obj (varies): The native datatype object or swagger type to serialise
52
        mode (str): 'json' or 'yaml', the supported export modes
53

54
    Returns (str): The serialised object
55

56
    """
57
    assert mode in ['json', 'yaml']
1✔
58
    api_client = nipyapi.nifi.ApiClient()
1✔
59
    prepared_obj = api_client.sanitize_for_serialization(obj)
1✔
60
    if mode == 'json':
1✔
61
        try:
1✔
62
            return json.dumps(
1✔
63
                obj=prepared_obj,
64
                sort_keys=True,
65
                indent=4
66
            )
67
        except TypeError as e:
1✔
68
            raise e
1✔
69
    if mode == 'yaml':
1✔
70
        # Use 'safe' loading to prevent arbitrary code execution
71
        yaml = YAML(typ='safe', pure=True)
1✔
72
        # Create a StringIO object to act as the stream
73
        stream = StringIO()
1✔
74
        # Dump to the StringIO stream
75
        yaml.dump(prepared_obj, stream)
1✔
76
        # Return the contents of the stream as a string
77
        return stream.getvalue()
1✔
78
    raise ValueError("Invalid dump Mode specified {0}".format(mode))
×
79

80

81
def load(obj, dto=None):
1✔
82
    """
83
    Loads a serialised object back into native datatypes, and optionally
84
    imports it back into the native NiFi DTO
85

86
    Warning: Using this on objects not produced by this Package may have
87
    unintended results! While efforts have been made to ensure that unsafe
88
    loading is not possible, no stringent security testing has been completed.
89

90
    Args:
91
        obj (dict, list): The serialised object to import
92
        dto (Optional [tuple{str, str}]): A Tuple describing the service and
93
        object that should be constructed.
94

95
        e.g. dto = ('registry', 'VersionedFlowSnapshot')
96

97
    Returns: Either the loaded object in native Python datatypes, or the
98
        constructed native datatype object
99

100
    """
101
    assert isinstance(obj, (six.string_types, bytes))
1✔
102
    assert dto is None or isinstance(dto, tuple)
1✔
103
    yaml = YAML(typ='safe', pure=True)
1✔
104
    loaded_obj = yaml.load(obj)
1✔
105
    if dto:
1✔
106
        assert dto[0] in ['nifi', 'registry']
1✔
107
        assert isinstance(dto[1], six.string_types)
1✔
108
        obj_as_json = dump(loaded_obj)
1✔
109
        response = Response()
1✔
110
        response.data = obj_as_json
1✔
111
        if 'nifi' in dto[0]:
1✔
112
            return nipyapi.config.nifi_config.api_client.deserialize(
1✔
113
                response=response,
114
                response_type=dto[1]
115
            )
116
        return nipyapi.config.registry_config.api_client.deserialize(
1✔
117
            response=response,
118
            response_type=dto[1]
119
        )
120
    return loaded_obj
1✔
121

122

123
def fs_write(obj, file_path):
1✔
124
    """
125
    Convenience function to write an Object to a FilePath
126

127
    Args:
128
        obj (varies): The Object to write out
129
        file_path (str): The Full path including filename to write to
130

131
    Returns: The object that was written
132
    """
133
    try:
1✔
134
        with io.open(str(file_path), 'w', encoding=DEF_ENCODING) as f:
1✔
135
            if isinstance(obj, bytes):
1✔
136
                obj_str = obj.decode(DEF_ENCODING)
×
137
            else:
138
                obj_str = obj
1✔
139
            f.write(obj_str)
1✔
140
        return obj
1✔
141
    except TypeError as e:
1✔
142
        raise e
1✔
143

144

145
def fs_read(file_path):
1✔
146
    """
147
    Convenience function to read an Object from a FilePath
148

149
    Args:
150
        file_path (str): The Full path including filename to read from
151

152
    Returns: The object that was read
153
    """
154
    try:
1✔
155
        with io.open(str(file_path), 'r', encoding=DEF_ENCODING) as f:
1✔
156
            return f.read()
1✔
157
    except IOError as e:
1✔
158
        raise e
1✔
159

160

161
def filter_obj(obj, value, key, greedy=True):
1✔
162
    """
163
    Implements a custom filter method because native datatypes don't have
164
    consistently named or located fields.
165

166
    Note that each object used by this function must be registered with
167
    identifier_types and identifiers in config
168

169
    Args:
170
        obj (varies): the NiFi or NiFi-Registry object to filter on
171
        value (str): the String value to look for
172
        key (str): the object key to filter against
173
        greedy (bool): If True, the value will be matched anywhere in the
174
            string, if False it will require exact match
175

176
    Returns: None if 0 matches, list if > 1, single Object entity if ==1
177

178
    """
179
    # Using the object class name as a lookup as they are unique within the
180
    # NiFi DTOs
181
    if isinstance(obj, list) and not obj:
1✔
182
        return None
1✔
183
    try:
1✔
184
        obj_class_name = obj[0].__class__.__name__
1✔
185
    except (TypeError, IndexError) as e:
×
186
        _raise(
187
            TypeError(
188
                "The passed object {0} is not a filterable nipyapi object"
189
                .format(obj.__class__.__name__)), e)
190
    # Check if this class has a registered filter in Nipyapi.config
191
    this_filter = nipyapi.config.registered_filters.get(obj_class_name, False)
1✔
192
    if not this_filter:
1✔
193
        registered_filters = ' '.join(nipyapi.config.registered_filters.keys())
1✔
194
        raise ValueError(
1✔
195
            "{0} is not a registered NiPyApi filterable class, registered "
196
            "classes are {1}".format(obj_class_name, registered_filters)
197
        )
198
    # Check if the supplied key is part of the registered filter
199
    key_lookup = nipyapi.config.registered_filters[obj_class_name].get(
1✔
200
        key, False
201
    )
202
    if not key_lookup:
1✔
203
        valid_keys = ' '.join(
1✔
204
            nipyapi.config.registered_filters[obj_class_name].keys()
205
        )
206
        raise ValueError(
1✔
207
            "{0} is not a registered filter method for object {1}, valid "
208
            "methods are {2}".format(key, obj_class_name, valid_keys)
209
        )
210
    # List comprehension using reduce to unpack the list of keys in the filter
211
    if greedy:
1✔
212
        out = [
1✔
213
            i for i in obj if value in
214
            reduce(operator.getitem, key_lookup, i.to_dict())
215
        ]
216
    else:
217
        out = [
1✔
218
            i for i in obj if
219
            value == reduce(operator.getitem, key_lookup, i.to_dict())
220
        ]
221
    # Manage our return contract
222
    if not out:
1✔
223
        return None
1✔
224
    if len(out) > 1:
1✔
225
        return out
1✔
226
    return out[0]
1✔
227

228

229
def wait_to_complete(test_function, *args, **kwargs):
1✔
230
    """
231
    Implements a basic return loop for a given function which is capable of a
232
    True|False output
233

234
    Args:
235
        test_function: Function which returns a bool once the target
236
            state is reached
237
        delay (int): The number of seconds between each attempt, defaults to
238
            config.short_retry_delay
239
        max_wait (int): the maximum number of seconds before issuing a Timeout,
240
            defaults to config.short_max_wait
241
        *args: Any args to pass through to the test function
242
        **kwargs: Any Keword Args to pass through to the test function
243

244
    Returns (bool): True for success, False for not
245

246
    """
247
    log.info("Called wait_to_complete for function %s",
1✔
248
             test_function.__name__)
249
    delay = kwargs.pop('nipyapi_delay', nipyapi.config.short_retry_delay)
1✔
250
    max_wait = kwargs.pop('nipyapi_max_wait', nipyapi.config.short_max_wait)
1✔
251
    timeout = time.time() + max_wait
1✔
252
    while time.time() < timeout:
1✔
253
        log.debug("Calling test_function")
1✔
254
        test_result = test_function(*args, **kwargs)
1✔
255
        log.debug("Checking result")
1✔
256
        if test_result:
1✔
257
            log.info("Function output evaluated to True, returning output")
1✔
258
            return test_result
1✔
259
        log.info("Function output evaluated to False, sleeping...")
×
260
        time.sleep(delay)
×
261
    log.info("Hit Timeout, raising TimeOut Error")
×
262
    raise ValueError("Timed Out waiting for {0} to complete".format(
×
263
        test_function.__name__))
264

265

266
def is_endpoint_up(endpoint_url):
1✔
267
    """
268
    Tests if a URL is available for requests
269

270
    Args:
271
        endpoint_url (str): The URL to test
272

273
    Returns (bool): True for a 200 response, False for not
274

275
    """
276
    log.info("Called is_endpoint_up with args %s", locals())
1✔
277
    try:
1✔
278
        response = requests.get(
1✔
279
            endpoint_url,
280
            timeout=nipyapi.config.short_max_wait
281
        )
282
        if response.status_code:
1✔
283
            if response.status_code == 200:
1✔
284
                log.info("Got 200 response from endpoint, returning True")
1✔
285
                return True
1✔
286
            log.info("Got status code %s from endpoint, returning False",
×
287
                     response.status_code)
288
        return False
×
289
    except (requests.ConnectionError, requests.exceptions.SSLError) as e:
1✔
290
        log.info("Got Error of type %s with details %s", type(e), str(e))
1✔
291
        if 'SSLError' in str(type(e)):
1✔
292
            log.info("Got OpenSSL error, port is probably up but needs Cert")
1✔
293
            return True
1✔
294
        log.info("Got ConnectionError, returning False")
×
295
        return False
×
296

297

298
def set_endpoint(endpoint_url, ssl=False, login=False, username=None, password=None):
299
    """Sets the endpoint when switching between instances of NiFi or other projects.
300

301
    Args:
302
        endpoint_url (str): The URL to set as the endpoint
303
        ssl (bool): Whether to use SSL context for HTTPS connections
304
        login (bool): Whether to attempt token-based login
305
        username (str): The username to use for login
306
        password (str): The password to use for login
307

308
    Returns (bool): True for success
309
    """
310
    log.info("Called set_endpoint with args %s", locals())
311
    if 'nifi-api' in endpoint_url:
312
        configuration = nipyapi.config.nifi_config
313
        service = 'nifi'
314
    elif 'registry-api' in endpoint_url:
315
        configuration = nipyapi.config.registry_config
316
        service = 'registry'
317
    else:
318
        raise ValueError("Endpoint not recognised")
319

320
    log.info("Setting %s endpoint to %s", service, endpoint_url)
321
    if configuration.api_client:
322
        # Running controlled logout procedure
323
        nipyapi.security.service_logout(service)
324
        # Resetting API client so it recreates from config.host
325
        configuration.api_client = None
326

327
    # remove any trailing slash to avoid hard to spot errors
328
    configuration.host = endpoint_url.rstrip('/')
329

330
    # Set up SSL context if using HTTPS
331
    if ssl and 'https://' in endpoint_url:
332
        if login:
333
            # Username/password auth with basic SSL
334
            nipyapi.security.set_service_ssl_context(
335
                service=service,
336
                ca_file=nipyapi.config.default_ssl_context['ca_file']
337
            )
338
            nipyapi.security.service_login(
339
                service, username=username, password=password
340
            )
341
        else:
342
            # mTLS auth with client certificates
343
            nipyapi.security.set_service_ssl_context(
344
                service=service,
345
                ca_file=nipyapi.config.default_ssl_context['ca_file'],
346
                client_cert_file=nipyapi.config.default_ssl_context['client_cert_file'],
347
                client_key_file=nipyapi.config.default_ssl_context['client_key_file'],
348
                client_key_password=nipyapi.config.default_ssl_context['client_key_password']
349
            )
350

351
    return True
352

353

354
# pylint: disable=R0913,R0902,R0917
355
class DockerContainer():
1✔
356
    """
357
    Helper class for Docker container automation without using Ansible
358
    """
359
    def __init__(self, name=None, image_name=None, image_tag=None, ports=None,
1✔
360
                 env=None, volumes=None, test_url=None, endpoint=None):
361
        if not DOCKER_AVAILABLE:
×
362
            raise ImportError(
×
363
                "The 'docker' package is required for this class. "
364
                "Please install nipyapi with the 'demo' extra: "
365
                "pip install nipyapi[demo]"
366
            )
367
        self.name = name
×
368
        self.image_name = image_name
×
369
        self.image_tag = image_tag
×
370
        self.ports = ports
×
371
        self.env = env
×
372
        self.volumes = volumes
×
373
        self.test_url = test_url
×
374
        self.endpoint = endpoint
×
375
        self.container = None
×
376

377
    def get_test_url_status(self):
1✔
378
        """
379
        Checks if a URL is available
380
        :return: status code if available, String 'ConnectionError' if not
381
        """
382
        try:
×
383
            return requests.get(self.test_url, timeout=10).status_code
×
384
        except requests.ConnectionError:
×
385
            return 'ConnectionError'
×
386
        except requests.Timeout:
×
387
            return 'Timeout'
×
388

389
    def set_container(self, container):
1✔
390
        """Set the container object"""
391
        self.container = container
×
392

393
    def get_container(self):
1✔
394
        """Fetch the container object"""
395
        return self.container
×
396

397

398
# pylint: disable=W0703,R1718
399
def start_docker_containers(docker_containers, network_name='demo'):
1✔
400
    """
401
    Deploys a list of DockerContainer's on a given network
402

403
    Args:
404
        docker_containers (list[DockerContainer]): list of Dockers to start
405
        network_name (str): The name of the Docker Bridge Network to get or
406
            create for the Docker Containers
407

408
    Returns: Nothing
409

410
    """
411
    if not DOCKER_AVAILABLE:
×
412
        raise ImportError(
×
413
            "The 'docker' package is required for this function. "
414
            "Please install nipyapi with the 'demo' extra: "
415
            "pip install nipyapi[demo]"
416
        )
417

418
    log.info("Creating Docker client using Environment Variables")
×
419
    d_client = docker.from_env()
×
420

421
    # Test if Docker Service is available
422
    try:
×
423
        d_client.version()
×
424
    except Exception as e:
×
425
        _raise(EnvironmentError("Docker Service not found"), e)
×
426

427
    for target in docker_containers:
×
428
        assert isinstance(target, DockerContainer)
×
429

430
    # Pull relevant Images
431
    log.info("Pulling relevant Docker Images if needed")
×
432
    for image in set([(c.image_name + ':' + c.image_tag)
×
433
                      for c in docker_containers]):
434
        log.info("Checking image %s", image)
×
435
        try:
×
436
            d_client.images.get(image)
×
437
            log.info("Using local image for %s", image)
×
438
        except ImageNotFound:
×
439
            log.info("Pulling %s", image)
×
440
            d_client.images.pull(image)
×
441

442
    # Clear previous containers
443
    log.info("Clearing previous containers for this demo")
×
444
    d_clear_list = [li for li in d_client.containers.list(all=True)
×
445
                    if li.name in [i.name for i in docker_containers]]
446
    for c in d_clear_list:
×
447
        log.info("Removing old container %s", c.name)
×
448
        c.remove(force=True)
×
449

450
    # Deploy/Get Network
451
    log.info("Getting Docker bridge network")
×
452
    d_n_list = [li for li in d_client.networks.list()
×
453
                if network_name in li.name]
454
    if not d_n_list:
×
455
        d_network = d_client.networks.create(
×
456
            name=network_name,
457
            driver='bridge',
458
            check_duplicate=True
459
        )
460
    elif len(d_n_list) > 1:
×
461
        raise EnvironmentError("Too many test networks found")
×
462
    else:
463
        d_network = d_n_list[0]
×
464
    log.info("Using Docker network: %s", d_network.name)
×
465

466
    # Deploy Containers
467
    log.info("Starting relevant Docker Containers")
×
468
    for c in docker_containers:
×
469
        log.info("Starting Container %s", c.name)
×
470
        c.set_container(d_client.containers.run(
×
471
            image=c.image_name + ':' + c.image_tag,
472
            detach=True,
473
            network=network_name,
474
            hostname=c.name,
475
            name=c.name,
476
            ports=c.ports,
477
            environment=c.env,
478
            volumes=c.volumes,
479
            auto_remove=True
480
        ))
481

482

483
class VersionError(Exception):
1✔
484
    """Error raised when a feature is not supported in the current version"""
485

486

487
def check_version(base, comparator=None, service='nifi',
1✔
488
                  default_version='0.2.0'):
489
    """
490
    Compares version base against either version comparator, or the version
491
    of the currently connected service instance.
492

493
    Since NiFi is java, it may return a version with -SNAPSHOT as part of it.
494
    As such, that will be stripped from either the comparator version or
495
    the version returned from NiFi
496

497
    Args:
498
        base (str): The base version for the comparison test
499
        comparator (optional[str]): The version to compare against
500
        default_version (optional[str]): The version to assume the service is
501
            if the check cannot be completed
502
        service (str): The service to test the version against, currently
503
            only supports NiFi
504

505
    Returns (int): -1/0/1 if base is lower/equal/newer than comparator
506

507
    Raises:
508
        VersionError: When a feature is not supported in the current version
509
    """
510

511
    def strip_version_string(version_string):
1✔
512
        # Reduces the string to only the major.minor.patch version
513
        return '.'.join(version_string.split('-')[0].split('.')[:3])
1✔
514

515
    assert isinstance(base, six.string_types)
1✔
516
    assert comparator is None or isinstance(comparator, six.string_types)
1✔
517
    assert service in ['nifi', 'registry']
1✔
518
    ver_a = version.parse(strip_version_string(base))
1✔
519
    if comparator:
1✔
520
        ver_b = version.parse(strip_version_string(comparator))
1✔
521
    elif service == 'registry':
1✔
522
        try:
1✔
523
            reg_ver = nipyapi.system.get_registry_version_info()
1✔
524
            ver_b = version.parse(strip_version_string(reg_ver))
1✔
525
        except nipyapi.registry.rest.ApiException:
×
526
            log.warning(
×
527
                "Unable to get registry version, trying swagger.json")
528
            try:
×
529
                config = nipyapi.config.registry_config
×
530
                if config.api_client is None:
×
531
                    config.api_client = nipyapi.registry.ApiClient()
×
532
                reg_swagger_def = config.api_client.call_api(
×
533
                    resource_path='/swagger/swagger.json',
534
                    method='GET', _preload_content=False,
535
                    auth_settings=['tokenAuth', 'Authorization']
536
                )
537
                reg_json = load(reg_swagger_def[0].data)
×
538
                ver_b = version.parse(reg_json['info']['version'])
×
539
            except nipyapi.registry.rest.ApiException:
×
540
                log.warning(
×
541
                    "Can't get registry swagger.json, assuming version %s",
542
                    default_version)
543
                ver_b = version.parse(default_version)
×
544
    else:
545
        ver_b = version.parse(
1✔
546
            strip_version_string(
547
                nipyapi.system.get_nifi_version_info().ni_fi_version
548
            )
549
        )
550
    if ver_b > ver_a:
1✔
551
        return -1
1✔
552
    if ver_b < ver_a:
1✔
553
        return 1
1✔
554
    return 0
1✔
555

556

557
def validate_parameters_versioning_support(verify_nifi=True,
1✔
558
                                           verify_registry=True):
559
    """
560
    Convenience method to check if Parameters are supported
561
    Args:
562
        verify_nifi (bool): If True, check NiFi meets the min version
563
        verify_registry (bool): If True, check Registry meets the min version
564
    """
565
    if verify_nifi:
1✔
566
        nifi_check = enforce_min_ver('1.10', bool_response=True)
1✔
567
    else:
568
        nifi_check = False
1✔
569

570
    if verify_registry:
1✔
571
        registry_check = enforce_min_ver('0.6', service='registry',
1✔
572
                                         bool_response=True)
573
    else:
574
        registry_check = False
×
575

576
    if nifi_check or registry_check:
1✔
577
        log.warning("Connected NiFi Registry may not support "
×
578
                    "Parameter Contexts and they may be lost in "
579
                    "Version Control")
580

581

582
def validate_templates_version_support():
1✔
583
    """
584
    Validate that the current version of NiFi supports Templates API
585
    """
586
    enforce_max_ver('2', service='nifi', error_message="Templates are deprecated in NiFi 2.x")
1✔
587

588

589
def enforce_max_ver(max_version, bool_response=False, service='nifi', error_message=None):
1✔
590
    """
591
    Raises an error if target NiFi environment is at or above the max version
592
    """
593
    if check_version(max_version, service=service) == -1:
1✔
594
        if not bool_response:
×
595
            raise VersionError(error_message or "This function is not available "
×
596
                               "in NiFi {} or above".format(max_version))
597
        return True
×
598
    return False
1✔
599

600

601
def enforce_min_ver(min_version, bool_response=False, service='nifi'):
1✔
602
    """
603
    Raises an error if target NiFi environment is not minimum version
604
    Args:
605
        min_version (str): Version to check against
606
        bool_response (bool): If True, will return True instead of
607
         raising error
608
     service: nifi or registry
609

610
    Returns:
611
        (bool) or (NotImplementedError)
612
    """
613
    if check_version(min_version, service=service) == 1:
1✔
614
        if not bool_response:
×
615
            raise VersionError(
×
616
                "This function is not available "
617
                "before NiFi version " + str(min_version))
618
        return True
×
619
    return False
1✔
620

621

622
def infer_object_label_from_class(obj):
1✔
623
    """
624
    Returns the expected STRING label for an object class required by certain
625
        functions.
626

627
    Args:
628
        obj: The object to infer the name of
629

630
    Returns:
631
        str of the relevant name, or raises an AssertionError
632

633
    """
634
    if isinstance(obj, nipyapi.nifi.ProcessorEntity):
1✔
635
        return 'PROCESSOR'
1✔
636
    if isinstance(obj, nipyapi.nifi.FunnelEntity):
1✔
637
        return 'FUNNEL'
1✔
638
    if isinstance(obj, nipyapi.nifi.PortEntity):
1✔
639
        return obj.port_type
1✔
640
    if isinstance(obj, nipyapi.nifi.RemoteProcessGroupDTO):
×
641
        return 'REMOTEPROCESSGROUP'
×
642
    if isinstance(obj, nipyapi.nifi.RemoteProcessGroupPortDTO):
×
643
        # get RPG summary, find id of obj in input or output list
644
        parent_rpg = nipyapi.canvas.get_remote_process_group(
×
645
            obj.group_id, True)
646
        if obj.id in [x.id for x in parent_rpg['input_ports']]:
×
647
            return 'REMOTE_INPUT_PORT'
×
648
        if obj.id in [x.id for x in parent_rpg['output_ports']]:
×
649
            return 'REMOTE_OUTPUT_PORT'
×
650
        raise ValueError("Remote Port not present as expected in RPG")
×
651
    raise AssertionError("Object Class not recognised for this function")
×
652

653

654
def bypass_slash_encoding(service, bypass):
655
    """
656
    Instructs the API Client to bypass encoding the '/' character
657

658
    Args:
659
        service (str): 'nifi' or 'registry'
660
        bypass (bool): True will not encode '/' in fields via API calls
661

662
    Returns:
663
        None
664

665
    """
666
    assert service in ['nifi', 'registry']
667
    assert isinstance(bypass, bool)
668
    current_config = getattr(nipyapi, service).configuration
669
    if bypass:
670
        if '/' not in current_config.safe_chars_for_path_param:
671
            current_config.safe_chars_for_path_param += '/'
672
    else:
673
        current_config.safe_chars_for_path_param = \
674
            copy(nipyapi.config.default_safe_chars)
675

676

677
@contextmanager
1✔
678
def rest_exceptions():
1✔
679
    """Simple exception wrapper for Rest Exceptions"""
680
    try:
1✔
681
        yield
1✔
682
    except (nipyapi.nifi.rest.ApiException,
1✔
683
            nipyapi.registry.rest.ApiException) as e:
684
        _raise(ValueError(e.body), e)
1✔
685

686

687
def exception_handler(status_code=None, response=None):
1✔
688
    """Simple Function wrapper to handle HTTP Status Exceptions"""
689
    def func_wrapper(f):
1✔
690
        @wraps(f)
1✔
691
        def wrapper(*args, **kwargs):
1✔
692
            try:
1✔
693
                return f(*args, **kwargs)
1✔
694
            except (nipyapi.nifi.rest.ApiException,
1✔
695
                    nipyapi.registry.rest.ApiException) as e:
696
                if status_code is not None and e.status == int(status_code):
1✔
697
                    return response
1✔
698
                _raise(ValueError(e.body), e)
×
699
        return wrapper
1✔
700
    return func_wrapper
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc