• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Chaffelson / nipyapi / #2

25 Mar 2025 10:32AM CUT coverage: 69.867% (+69.9%) from 0.0%
#2

push

coveralls-python

web-flow
V022release (#384)

* Update history for release

* Bump version: 0.21.0 → 0.22.0

1 of 1 new or added line in 1 file covered. (100.0%)

1048 of 1500 relevant lines covered (69.87%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.85
/nipyapi/security.py
1
# -*- coding: utf-8 -*-
2
# pylint: disable=C0302
3

4
"""
5
Secure connectivity management for NiPyApi
6
"""
7

8
from __future__ import absolute_import
1✔
9
import logging
1✔
10
import ssl
1✔
11
from copy import copy
1✔
12
import six
1✔
13
import urllib3
1✔
14
from future.utils import raise_from as _raise
1✔
15
import nipyapi
1✔
16

17
log = logging.getLogger(__name__)
1✔
18

19

20
__all__ = [
1✔
21
    "create_service_user",
22
    "create_service_user_group",
23
    "set_service_auth_token",
24
    "service_logout",
25
    "get_service_access_status",
26
    "add_user_to_access_policy",
27
    "update_access_policy",
28
    "get_access_policy_for_resource",
29
    "create_access_policy",
30
    "list_service_users",
31
    "get_service_user",
32
    "set_service_ssl_context",
33
    "add_user_group_to_access_policy",
34
    "bootstrap_security_policies",
35
    "service_login",
36
    "remove_service_user",
37
    "list_service_user_groups",
38
    "get_service_user_group",
39
    "remove_service_user_group",
40
]
41

42
# These are the known-valid policy actions
43
_valid_actions = ["read", "write", "delete"]
1✔
44
# These are the services that these functions know how to configure
45
_valid_services = ["nifi", "registry"]
1✔
46

47

48
def list_service_users(service="nifi"):
1✔
49
    """Lists all users of a given service, takes a service name as a string"""
50
    assert service in _valid_services
×
51
    with nipyapi.utils.rest_exceptions():
×
52
        out = getattr(nipyapi, service).TenantsApi().get_users()
×
53
    if service == "nifi":
×
54
        return out.users
×
55
    return out
×
56

57

58
def get_service_user(identifier, identifier_type="identity", service="nifi"):
1✔
59
    """
60
    Gets the unique user matching to the given identifier and type.
61

62
    Args:
63
        identifier (str): the string to search for
64
        identifier_type (str): the field to search in
65
        service (str): the name of the service
66

67
    Returns:
68
        None if no match, else single object
69

70
    """
71
    assert service in _valid_services
×
72
    assert isinstance(identifier, six.string_types)
×
73
    assert isinstance(identifier_type, six.string_types)
×
74
    obj = list_service_users(service)
×
75
    out = nipyapi.utils.filter_obj(obj, identifier, identifier_type,
×
76
                                   greedy=False)
77
    return out
×
78

79

80
def remove_service_user(user, service="nifi", strict=True):
1✔
81
    """
82
    Removes a given User from the given Service
83

84
    Args:
85
        user: [(nifi.UserEntity), (registry.User)] Target User object
86
        service (str): 'nifi' or 'registry'
87
        strict (bool): Whether to throw an error if User not found
88

89
    Returns:
90
        Updated User Entity or None
91
    """
92
    assert service in _valid_services
×
93
    if service == "registry":
×
94
        assert isinstance(user, nipyapi.registry.User)
×
95
        submit = {"id": user.identifier, "version": user.revision.version}
×
96
    else:
97
        assert isinstance(user, nipyapi.nifi.UserEntity)
×
98
        submit = {"id": user.id, "version": user.revision.version}
×
99
    assert isinstance(strict, bool)
×
100
    try:
×
101
        return getattr(nipyapi, service).TenantsApi().remove_user(**submit)
×
102
    except getattr(nipyapi, service).rest.ApiException as e:
×
103
        if "Unable to find user" in e.body or "does not exist" in e.body:
×
104
            if not strict:
×
105
                return None
×
106
        _raise(ValueError(e.body), e)
×
107

108

109
def create_service_user(identity, service="nifi", strict=True):
1✔
110
    """
111
    Attempts to create a user with the provided identity in the given service
112

113
    Args:
114
        identity (str): Identity string for the user
115
        service (str): 'nifi' or 'registry'
116
        strict (bool): If Strict, will error if user already exists
117

118
    Returns:
119
        The new (User) or (UserEntity) object
120

121
    """
122
    assert service in _valid_services
×
123
    assert isinstance(identity, six.string_types)
×
124
    assert isinstance(strict, bool)
×
125
    if service == "registry":
×
126
        user_obj = nipyapi.registry.User(identity=identity)
×
127
    else:
128
        # must be nifi
129
        user_obj = nipyapi.nifi.UserEntity(
×
130
            revision=nipyapi.nifi.RevisionDTO(version=0),
131
            component=nipyapi.nifi.UserDTO(identity=identity),
132
        )
133
    try:
×
134
        return getattr(nipyapi, service).TenantsApi().create_user(user_obj)
×
135
    except (nipyapi.nifi.rest.ApiException,
×
136
            nipyapi.registry.rest.ApiException) as e:
137
        if "already exists" in e.body and not strict:
×
138
            return get_service_user(identity, service=service)
×
139
        _raise(ValueError(e.body), e)
×
140

141

142
def create_service_user_group(identity, service="nifi", users=None,
1✔
143
                              strict=True):
144
    """
145
    Attempts to create a user with the provided identity and member users in
146
    the given service
147

148
    Args:
149
        identity (str): Identity string for the user group
150
        service (str): 'nifi' or 'registry'
151
        users (list): A list of nifi.UserEntity or registry.User
152
          belonging to the group
153
        strict (bool): Whether to throw an error on already exists
154

155
    Returns:
156
        The new (UserGroup) or (UserGroupEntity) object
157

158
    """
159
    assert service in _valid_services
×
160
    assert isinstance(identity, six.string_types)
×
161

162
    users_ids = None
×
163

164
    if service == "nifi":
×
165
        if users:
×
166
            assert all(isinstance(user, nipyapi.nifi.UserEntity)
×
167
                       for user in users)
168
            users_ids = [{"id": user.id} for user in users]
×
169
        user_group_obj = nipyapi.nifi.UserGroupEntity(
×
170
            revision=nipyapi.nifi.RevisionDTO(version=0),
171
            component=nipyapi.nifi.UserGroupDTO(identity=identity,
172
                                                users=users_ids),
173
        )
174
    else:
175
        if users:
×
176
            assert all(isinstance(user, nipyapi.registry.User)
×
177
                       for user in users)
178
            users_ids = [{"identifier": user.identifier} for user in users]
×
179
        user_group_obj = nipyapi.registry.UserGroup(identity=identity,
×
180
                                                    users=users_ids)
181
    try:
×
182
        return getattr(nipyapi, service).TenantsApi().create_user_group(
×
183
            user_group_obj)
184
    except (nipyapi.nifi.rest.ApiException,
×
185
            nipyapi.registry.rest.ApiException) as e:
186
        if "already exists" in e.body:
×
187
            if not strict:
×
188
                return get_service_user_group(identity, service=service)
×
189
        _raise(ValueError(e.body), e)
×
190

191

192
def list_service_user_groups(service="nifi"):
1✔
193
    """
194
    Returns list of service user groups for a given service
195
    Args:
196
        service (str): 'nifi' or 'registry'
197

198
    Returns:
199
        [(nifi.UserGroupEntity, registry.UserGroup)]
200

201
    """
202
    assert service in _valid_services
×
203
    with nipyapi.utils.rest_exceptions():
×
204
        out = getattr(nipyapi, service).TenantsApi().get_user_groups()
×
205
    if service == "nifi":
×
206
        return out.user_groups
×
207
    return out
×
208

209

210
def get_service_user_group(identifier, identifier_type="identity",
1✔
211
                           service="nifi"):
212
    """
213
    Gets the unique group matching to the given identifier and type.
214

215
    Args:
216
        identifier (str): the string to search for
217
        identifier_type (str): the field to search in, identity or id
218
        service (str): the name of the service
219

220
    Returns:
221
        None if no match, else single object
222

223
    """
224
    assert service in _valid_services
×
225
    assert isinstance(identifier, six.string_types)
×
226
    assert isinstance(identifier_type, six.string_types)
×
227
    obj = list_service_user_groups(service)
×
228
    out = nipyapi.utils.filter_obj(obj, identifier, identifier_type,
×
229
                                   greedy=False)
230
    return out
×
231

232

233
def remove_service_user_group(group, service="nifi", strict=True):
1✔
234
    """
235
    Removes a given User Group from the given Service
236

237
    Args:
238
        group: [(nifi.UserEntity), (registry.User)] Target User object
239
        service (str): 'nifi' or 'registry'
240
        strict (bool): Whether to throw an error if User not found
241

242
    Returns:
243
        Updated User Group or None
244
    """
245
    assert service in _valid_services
×
246
    if service == "registry":
×
247
        assert isinstance(group, nipyapi.registry.UserGroup)
×
248
        submit = {"id": group.identifier, "version": group.revision.version}
×
249
    else:
250
        assert isinstance(group, nipyapi.nifi.UserGroupEntity)
×
251
        submit = {"id": group.id, "version": group.revision.version}
×
252
    assert isinstance(strict, bool)
×
253
    try:
×
254
        return getattr(nipyapi, service).TenantsApi().remove_user_group(
×
255
            **submit)
256
    except getattr(nipyapi, service).rest.ApiException as e:
×
257
        if "Unable to find user" in e.body or "does not exist" in e.body:
×
258
            if not strict:
×
259
                return None
×
260
        _raise(ValueError(e.body), e)
×
261

262

263
def service_login(service="nifi", username=None, password=None,
264
                  bool_response=False):
265
    """
266
    Login to the currently configured NiFi or NiFi-Registry server.
267

268
    Login requires a secure connection over https.
269
    Prior to calling this method, the host must be specified
270
    and the SSLContext should be configured (if necessary).
271

272
    Successful login will result in a generated token (JWT) being
273
    cached in the api_client config that will be passed in all future
274
    REST API calls. To clear that token, call service_logout.
275

276
    The token is temporary and will expire after a duration set by
277
    the server. After a token expires, you must call
278
    this method again to generate a new token.
279

280
    Args:
281
        service (str): 'nifi' or 'registry'; the service to login to
282
        username (str): The username to submit
283
        password (str): The password to use
284
        bool_response (bool): If True, the function will return False instead
285
            of an error. Useful for connection testing.
286

287
    Returns:
288
        (bool): True if successful, False or an Error if not. See bool_response
289

290
    """
291
    log_args = locals()
292
    log_args["password"] = "REDACTED"
293
    log.info("Called service_login with args %s", log_args)
294
    assert service in _valid_services
295
    assert username is None or isinstance(username, six.string_types)
296
    assert password is None or isinstance(password, six.string_types)
297
    assert isinstance(bool_response, bool)
298

299
    configuration = getattr(nipyapi, service).configuration
300
    assert configuration.host, "Host must be set prior to logging in."
301
    assert configuration.host.startswith(
302
        "https"
303
    ), "Login is only available when connecting over HTTPS."
304
    default_pword = getattr(nipyapi.config, "default_" + service + "_password")
305
    default_uname = getattr(nipyapi.config, "default_" + service + "_username")
306
    # We use copy so we don't clobber the default by mistake
307
    pword = password if password is not None else copy(default_pword)
308
    uname = username if username is not None else copy(default_uname)
309
    assert pword, "Password must be set or in default config"
310
    assert uname, "Username must be set or in default config"
311
    # set username/password in configuration for initial login
312
    # Registry pulls from config, NiFi allows submission
313
    configuration.username = uname
314
    configuration.password = pword
315
    log.info(
316
        "Attempting tokenAuth login with user identity [%s]",
317
        configuration.username
318
    )
319
    try:
320
        if service == "nifi":
321
            token = nipyapi.nifi.AccessApi().create_access_token(
322
                username=uname, password=pword
323
            )
324
        else:
325
            token = (
326
                nipyapi.registry.AccessApi()
327
                .create_access_token_using_basic_auth_credentials()
328
            )
329
        set_service_auth_token(token=token, service=service)
330
        return True
331
    except getattr(nipyapi, service).rest.ApiException as e:
332
        if bool_response:
333
            return False
334
        _raise(ValueError(e.body), e)
335

336

337
def set_service_auth_token(token=None, token_name="tokenAuth", service="nifi"):
1✔
338
    """
339
    Helper method to set the auth token correctly for the specified service
340

341
    Args:
342
        token (Optional[str]): The token to set. Defaults to None.
343
        token_name (str): the api_key field name to set the token to. Defaults
344
            to 'tokenAuth'
345
        service (str): 'nifi' or 'registry', the service to set
346

347
    Returns:
348
        (bool): True on success, False if token not set
349
    """
350
    assert service in _valid_services
1✔
351
    assert isinstance(token_name, six.string_types)
1✔
352
    assert token is None or isinstance(token, six.string_types)
1✔
353
    if service == "registry":
1✔
354
        configuration = nipyapi.config.registry_config
1✔
355
    else:
356
        configuration = nipyapi.config.nifi_config
1✔
357
    if token:
1✔
358
        configuration.api_key[token_name] = token
1✔
359
        configuration.api_key_prefix[token_name] = "Bearer"
1✔
360
    else:
361
        # If not token, then assume we are doing logout and cleanup
362
        if token_name in configuration.api_key:
1✔
363
            configuration.api_key.pop(token_name)
1✔
364
    if token_name not in configuration.api_key:
1✔
365
        return False
1✔
366
    return True
1✔
367

368

369
def service_logout(service="nifi"):
1✔
370
    """
371
    Logs out from the service by resetting the token
372
    Args:
373
        service (str): 'nifi' or 'registry'; the target service
374

375
    Returns:
376
        (bool): True of access removed, False if still set
377

378
    """
379
    assert service in _valid_services
1✔
380
    set_service_auth_token(token=None, service=service)
1✔
381
    try:
1✔
382
        status = get_service_access_status(service, bool_response=True)
1✔
383
    except ValueError as e:
×
384
        if "Cannot set verify_mode to CERT_NONE" in str(e):
×
385
            status = None
×
386
            # Logout throws error with incorrect ssl setup
387
        else:
388
            raise e
×
389
    if not status:
1✔
390
        return True
×
391
    return False
1✔
392

393

394
def get_service_access_status(service="nifi", bool_response=False):
1✔
395
    """
396
    Gets the access status for the current session
397

398
    Args:
399
        service (str): A String of 'nifi' or 'registry' to indicate which
400
            service to check status for
401
        bool_response (bool): If True, the function will return False on
402
            hitting an Error instead of raising it. Useful for connection
403
            testing.
404

405
    Returns:
406
        (bool) if bool_response, else the Service Access Status of the User
407
    """
408
    log.info("Called get_service_access_status with args %s", locals())
1✔
409
    assert service in _valid_services
1✔
410
    assert isinstance(bool_response, bool)
1✔
411
    if bool_response:
1✔
412
        # Assume we are using this as a connection test and therefore disable
413
        # the Warnings urllib3 will shower us with
414
        log.debug("- bool_response is True, disabling urllib3 warnings")
1✔
415
        logging.getLogger("urllib3").setLevel(logging.ERROR)
1✔
416
    try:
1✔
417
        out = getattr(nipyapi, service).AccessApi().get_access_status()
1✔
418
        log.info("Got server response, returning")
1✔
419
        return out
1✔
420
    except urllib3.exceptions.MaxRetryError as e:
×
421
        log.debug("- Caught exception %s", type(e))
×
422
        if bool_response:
×
423
            log.debug(
×
424
                "Connection failed with error %s and bool_response is "
425
                "True, returning False",
426
                e,
427
            )
428
            return False
×
429
        log.debug("- bool_response is False, raising Exception")
×
430
        raise e
×
431
    except getattr(nipyapi, service).rest.ApiException as e:
×
432
        expected_errors = [
×
433
            "Authentication object was not found",
434
            "only supported when running over HTTPS",
435
        ]
436
        if any(x in e.body for x in expected_errors):
×
437
            if bool_response:
×
438
                return False
×
439
            raise e
×
440

441

442
def add_user_to_access_policy(user, policy, service="nifi", refresh=True,
1✔
443
                              strict=True):
444
    """
445
    Attempts to add the given user object to the given access policy
446

447
    Args:
448
        user (User) or (UserEntity): User object to add
449
        policy (AccessPolicyEntity) or (AccessPolicy): Access Policy object
450
        service (str): 'nifi' or 'registry' to identify the target service
451
        refresh (bool): Whether to refresh the policy object before submit
452
        strict (bool): If True, will return error if user already present,
453
          if False will ignore the already exists
454

455
    Returns:
456
        Updated Policy object
457

458
    """
459
    assert service in _valid_services
×
460
    assert isinstance(
×
461
        policy,
462
        (
463
            nipyapi.registry.AccessPolicy
464
            if service == "registry"
465
            else nipyapi.nifi.AccessPolicyEntity
466
        ),
467
    )
468
    assert isinstance(
×
469
        user,
470
        nipyapi.registry.User if service == "registry"
471
        else nipyapi.nifi.UserEntity,
472
    )
473

474
    user_id = user.id if service == "nifi" else user.identifier
×
475

476
    if refresh:
×
477
        policy_tgt = (
×
478
            getattr(nipyapi, service)
479
            .PoliciesApi()
480
            .get_access_policy(policy.id if service == "nifi"
481
                               else policy.identifier)
482
        )
483
    else:
484
        policy_tgt = policy
×
485

486
    assert isinstance(
×
487
        policy_tgt,
488
        (
489
            nipyapi.registry.AccessPolicy
490
            if service == "registry"
491
            else nipyapi.nifi.AccessPolicyEntity
492
        ),
493
    )
494

495
    policy_users = (
×
496
        policy_tgt.users if service == "registry"
497
        else policy_tgt.component.users
498
    )
499
    policy_user_ids = [
×
500
        i.identifier if service == "registry" else i.id for i in policy_users
501
    ]
502
    if user_id not in policy_user_ids:
×
503
        if service == "registry":
×
504
            policy_tgt.users.append(user)
×
505
        elif service == "nifi":
×
506
            policy_tgt.component.users.append({"id": user_id})
×
507

508
        return nipyapi.security.update_access_policy(policy_tgt, service)
×
509
    if strict and user_id in policy_user_ids:
×
510
        raise ValueError("Strict is True and User ID already in Policy")
×
511

512

513
def add_user_group_to_access_policy(user_group, policy, service="nifi",
1✔
514
                                    refresh=True):
515
    """
516
    Attempts to add the given user group object to the given access policy
517

518
    Args:
519
        user_group (UserGroup) or (UserGroupEntity): User group object to add
520
        policy (AccessPolicyEntity) or (AccessPolicy): Access Policy object
521
        service (str): 'nifi' or 'registry' to identify the target service
522
        refresh (bool): Whether to refresh the policy object before submission
523

524
    Returns:
525
        Updated Policy object
526

527
    """
528
    assert service in _valid_services
×
529
    assert isinstance(
×
530
        policy,
531
        (
532
            nipyapi.registry.AccessPolicy
533
            if service == "registry"
534
            else nipyapi.nifi.AccessPolicyEntity
535
        ),
536
    )
537
    assert isinstance(
×
538
        user_group,
539
        (
540
            nipyapi.registry.UserGroup
541
            if service == "registry"
542
            else nipyapi.nifi.UserGroupEntity
543
        ),
544
    )
545
    user_group_id = (
×
546
        user_group.id if service == "nifi" else user_group.identifier
547
    )
548

549
    if refresh:
×
550
        policy_tgt = (
×
551
            getattr(nipyapi, service)
552
            .PoliciesApi()
553
            .get_access_policy(policy.id if service == "nifi"
554
                               else policy.identifier)
555
        )
556
    else:
557
        policy_tgt = policy
×
558

559
    assert isinstance(
×
560
        policy_tgt,
561
        (
562
            nipyapi.registry.AccessPolicy
563
            if service == "registry"
564
            else nipyapi.nifi.AccessPolicyEntity
565
        ),
566
    )
567

568
    policy_user_groups = (
×
569
        policy_tgt.users if service == "registry"
570
        else policy_tgt.component.user_groups
571
    )
572
    policy_user_group_ids = [
×
573
        i.identifier if service == "registry" else i.id
574
        for i in policy_user_groups
575
    ]
576

577
    assert user_group_id not in policy_user_group_ids
×
578

579
    if service == "registry":
×
580
        policy_tgt.user_groups.append(user_group)
×
581
    elif service == "nifi":
×
582
        policy_tgt.component.user_groups.append({"id": user_group_id})
×
583

584
    return nipyapi.security.update_access_policy(policy_tgt, service)
×
585

586

587
def update_access_policy(policy, service="nifi"):
1✔
588
    """
589
    Applies an updated access policy to the service indicated
590

591
    Args:
592
        policy (PolicyEntity): The policy object to submit
593
        service (str): 'nifi' or 'registry' to indicate the target service
594

595
    Returns:
596
        (PolicyEntity): The updated policy if successful
597

598
    """
599
    assert service in _valid_services
×
600
    assert isinstance(
×
601
        policy,
602
        (
603
            nipyapi.registry.AccessPolicy
604
            if service == "registry"
605
            else nipyapi.nifi.AccessPolicyEntity
606
        ),
607
    ), "Policy type {0} not valid.".format(type(policy))
608
    with nipyapi.utils.rest_exceptions():
×
609
        return (
×
610
            getattr(nipyapi, service)
611
            .PoliciesApi()
612
            .update_access_policy(
613
                id=policy.id if service == "nifi" else policy.identifier,
614
                body=policy
615
            )
616
        )
617

618

619
def get_access_policy_for_resource(
1✔
620
    resource, action, r_id=None, service="nifi", auto_create=False
621
):
622
    """
623
    Attempts to retrieve the access policy for a given resource and action,
624
    and optionally resource_id if targeting NiFi. Optionally creates the policy
625
    if it doesn't already exist
626

627
    Args:
628
        resource (str): A valid resource in the target service
629
        action (str): A valid action, typically 'read', 'write' or 'delete'
630
        r_id (Optional[str]): The UUID of the resource, valid only if targeting
631
            NiFi resources
632
        service (str): Which service to target, typically 'nifi' or 'registry'
633
        auto_create (bool): Whether to create the targeted policy if it doesn't
634
            already exist
635

636
    Returns:
637
        The relevant AccessPolicy object
638

639
    """
640
    assert service in _valid_services
×
641
    assert action in _valid_actions
×
642
    assert r_id is None or isinstance(r_id, six.string_types)
×
643
    assert isinstance(resource, six.string_types)
×
644
    assert isinstance(auto_create, bool)
×
645
    log.info("Called get_access_policy_for_resource with Args %s", locals())
×
646

647
    # Strip leading '/' from resource as lookup endpoint prepends a '/'
648
    resource = resource[1:] if resource.startswith("/") else resource
×
649
    log.info("Getting %s Policy for %s:%s:%s", service, action, resource,
×
650
             str(r_id))
651
    if service == "nifi":
×
652
        pol_api = nipyapi.nifi.PoliciesApi()
×
653
    else:
654
        pol_api = nipyapi.registry.PoliciesApi()
×
655
    try:
×
656
        nipyapi.utils.bypass_slash_encoding(service, True)
657
        response = pol_api.get_access_policy_for_resource(
×
658
            action=action,
659
            resource="/".join([resource, r_id]) if r_id else resource
660
        )
661
        nipyapi.utils.bypass_slash_encoding(service, False)
662
        return response
×
663
    except nipyapi.nifi.rest.ApiException as e:
×
664
        if any(
×
665
            pol_string in e.body
666
            for pol_string in [
667
                "Unable to find access policy",
668
                "No policy found",
669
                "No access policy found",
670
            ]
671
        ):
672
            log.info("Access policy not found")
×
673
            if not auto_create:
×
674
                return None
×
675
            return nipyapi.security.create_access_policy(
×
676
                resource, action, r_id, service
677
            )
678
        log.info("Unexpected Error, raising...")
×
679
        _raise(ValueError(e.body), e)
×
680
    finally:
681
        nipyapi.utils.bypass_slash_encoding(service, False)
682

683

684
def create_access_policy(resource, action, r_id=None, service="nifi"):
1✔
685
    """
686
    Creates an access policy for the given resource, action and optionally
687
    resource_id for NiFi.
688

689
    Args:
690
        resource (str): a valid resource type for this service, e.g. 'bucket'
691
        action (str): a valid action type for this service, typically 'read',
692
            'write' or 'delete'
693
        r_id (optional[str]): if NiFi, the resource ID of the resource
694
        service (str): the service to target
695

696
    Returns:
697
        An access policy object for that service
698

699
    """
700
    assert isinstance(resource, six.string_types)
×
701
    assert action in _valid_actions
×
702
    assert r_id is None or isinstance(r_id, six.string_types)
×
703
    assert service in _valid_services
×
704
    if resource[0] != "/":
×
705
        r = "/" + resource
×
706
    else:
707
        r = resource
×
708
    with nipyapi.utils.rest_exceptions():
×
709
        if service == "nifi":
×
710
            return nipyapi.nifi.PoliciesApi().create_access_policy(
×
711
                body=nipyapi.nifi.AccessPolicyEntity(
712
                    revision=nipyapi.nifi.RevisionDTO(version=0),
713
                    component=nipyapi.nifi.AccessPolicyDTO(
714
                        action=action,
715
                        resource="/".join([r, r_id]) if r_id else r
716
                    ),
717
                )
718
            )
719
        # elif service == 'registry':
720
        return nipyapi.registry.PoliciesApi().create_access_policy(
×
721
            body=nipyapi.registry.AccessPolicy(action=action, resource=r)
722
        )
723

724

725
# pylint: disable=R0913, R0917
726
def set_service_ssl_context(
727
    service="nifi",
728
    ca_file=None,
729
    client_cert_file=None,
730
    client_key_file=None,
731
    client_key_password=None,
732
    check_hostname=None,
733
    purpose=None,
734
):
735
    """
736
    Create an SSLContext for connecting over https to a secured NiFi or
737
    NiFi-Registry instance.
738

739
    This method can be used to create an SSLContext for
740
    two-way TLS in which a client cert is used by the service to authenticate
741
    the client.
742

743
    This method can also be used for one-way TLS in which the client
744
    verifies the server's certificate, but authenticates using a
745
    different form of credentials, such as LDAP username/password.
746

747
    If you are using one-way TLS with a certificate signed by a
748
    root CA trusted by your system/platform, this step is not
749
    necessary as the default TLS-handshake should "just work."
750

751
    Args:
752
        service (str): 'nifi' or 'registry' to indicate which service
753
            config to set the ssl context to
754
        ca_file (str): A PEM file containing certs for the root CA(s)
755
            for the NiFi Registry server
756
        client_cert_file (str): A PEM file containing the public
757
            certificates for the user/client identity
758
        client_key_file (str): An encrypted (password-protected) PEM file
759
            containing the client's secret key
760
        client_key_password (str): The password to decrypt the client_key_file
761
        check_hostname (bool): Enable or Disable hostname checking
762
        purpose (ssl.Purpose): The purpose of the SSLContext
763

764
    Returns:
765
        (None)
766
    """
767
    assert service in ["nifi", "registry"]
768
    ssl_context = ssl.create_default_context(
769
        purpose=purpose or ssl.Purpose.SERVER_AUTH
770
        )
771
    if client_cert_file is not None and client_key_file is not None:
772
        try:
773
            ssl_context.load_cert_chain(
774
                certfile=client_cert_file,
775
                keyfile=client_key_file,
776
                password=client_key_password,
777
            )
778
        except FileNotFoundError as e:
779
            _raise(
780
                FileNotFoundError(
781
                    "Unable to read keyfile {0} or certfile {1}".format(
782
                        client_key_file, client_cert_file
783
                    )
784
                ),
785
                e,
786
            )
787
        except ssl.SSLError as e:
788
            if e.errno == 9:
789
                _raise(
790
                    ssl.SSLError(
791
                        "This error possibly pertains to a mis-typed or "
792
                        "incorrect key password"
793
                    ),
794
                    e,
795
                )
796

797
    if ca_file is not None:
798
        ssl_context.load_verify_locations(cafile=ca_file)
799

800
    if check_hostname is not None:
801
        ssl_context.check_hostname = check_hostname
802
    else:
803
        ssl_context.check_hostname = nipyapi.config.global_ssl_host_check
804

805
    if service == "registry":
806
        nipyapi.config.registry_config.ssl_context = ssl_context
807
    elif service == "nifi":
808
        nipyapi.config.nifi_config.ssl_context = ssl_context
809

810

811
# pylint: disable=W0702,R0912,r0914
812
def bootstrap_security_policies(service, user_identity=None, group_identity=None):
1✔
813
    """Creates a default security context within NiFi or Nifi-Registry.
814

815
    Args:
816
        service (str): The service to configure security for ('nifi' or 'registry')
817
        user_identity (nipyapi.nifi.UserEntity or nipyapi.registry.User, optional):
818
            User identity to apply policies to
819
        group_identity (nipyapi.nifi.UserGroupEntity or nipyapi.registry.UserGroup, optional):
820
            Group identity to apply policies to
821

822
    Returns:
823
        None
824
    """
825
    assert service in _valid_services, "service not in %s" % _valid_services
×
826
    valid_ident_obj = [nipyapi.nifi.UserEntity, nipyapi.registry.User]
×
827
    if user_identity is not None:
×
828
        assert type(user_identity) in valid_ident_obj
×
829

830
    if "nifi" in service:
×
831
        rpg_id = nipyapi.canvas.get_root_pg_id()
×
832
        if user_identity is None and group_identity is None:
×
833
            # Try to find user by certificate DN if using mTLS
834
            nifi_user_identity = nipyapi.security.get_service_user(
×
835
                nipyapi.config.default_mtls_identity, service="nifi"
836
            )
837
            # Fall back to default username if not found
838
            if not nifi_user_identity:
×
839
                nifi_user_identity = nipyapi.security.get_service_user(
×
840
                    nipyapi.config.default_nifi_username, service="nifi"
841
                )
842
        else:
843
            nifi_user_identity = user_identity
×
844

845
        access_policies = [
×
846
            ("write", "process-groups", rpg_id),
847
            ("read", "process-groups", rpg_id),
848
            ("write", "data/process-groups", rpg_id),
849
            ("read", "data/process-groups", rpg_id),
850
            ("read", "system", None),
851
            ("read", "system-diagnostics", None),
852
            ("read", "policies", None),
853
        ]
854
        for pol in access_policies:
×
855
            ap = nipyapi.security.get_access_policy_for_resource(
×
856
                action=pol[0],
857
                resource=pol[1],
858
                r_id=pol[2],
859
                service="nifi",
860
                auto_create=True,
861
            )
862
            if nifi_user_identity is None:
×
863
                # I should not rely upon a try/catch there
864
                # but it's the simplest way (I just hope it won't
865
                # break the server :-) )
866
                try:
×
867
                    nipyapi.security.add_user_group_to_access_policy(
×
868
                        user_group=group_identity,
869
                        policy=ap,
870
                        service="nifi"
871
                    )
872
                except:  # noqa
×
873
                    pass
874
            else:
875
                nipyapi.security.add_user_to_access_policy(
×
876
                    user=nifi_user_identity,
877
                    policy=ap,
878
                    service="nifi",
879
                    strict=False
880
                )
881
    else:
882
        if user_identity is None and group_identity is None:
×
883
            # Try to find user by certificate DN if using mTLS
884
            reg_user_identity = nipyapi.security.get_service_user(
×
885
                nipyapi.config.default_mtls_identity, service="registry"
886
            )
887
            # Fall back to default username if not found
888
            if not reg_user_identity:
×
889
                reg_user_identity = nipyapi.security.get_service_user(
×
890
                    nipyapi.config.default_registry_username,
891
                    service="registry"
892
                )
893
        else:
894
            reg_user_identity = user_identity
×
895

896
        all_buckets_access_policies = [
×
897
            ("read", "/buckets"),
898
            ("write", "/buckets"),
899
            ("delete", "/buckets"),
900
        ]
901
        for action, resource in all_buckets_access_policies:
×
902
            pol = nipyapi.security.get_access_policy_for_resource(
×
903
                resource=resource,
904
                action=action,
905
                service="registry",
906
                auto_create=True
907
            )
908
            if reg_user_identity is None:
×
909
                if group_identity:  # Only try to add group if it exists
×
910
                    nipyapi.security.add_user_group_to_access_policy(
×
911
                        user_group=group_identity,
912
                        policy=pol,
913
                        service="registry"
914
                    )
915
            else:
916
                nipyapi.security.add_user_to_access_policy(
×
917
                    user=reg_user_identity,
918
                    policy=pol,
919
                    service="registry",
920
                    strict=False
921
                )
922
        # get the identity of the user as a string
923
        if isinstance(reg_user_identity, nipyapi.registry.User):
×
924
            reg_user_ident_str = reg_user_identity.identity
×
925
        else:
926
            reg_user_ident_str = reg_user_identity
×
927
        # Setup Proxy Access
928
        nifi_proxy_user = nipyapi.security.create_service_user(
×
929
            identity=reg_user_ident_str,
930
            service="registry",
931
            strict=False
932
        )
933
        proxy_access_policies = [
×
934
            ("read", "/proxy"),
935
            ("write", "/proxy"),
936
            ("delete", "/proxy"),
937
        ]
938
        for action, resource in proxy_access_policies:
×
939
            pol = nipyapi.security.get_access_policy_for_resource(
×
940
                resource=resource,
941
                action=action,
942
                service="registry",
943
                auto_create=True
944
            )
945
            nipyapi.security.add_user_to_access_policy(
×
946
                user=nifi_proxy_user,
947
                policy=pol,
948
                service="registry",
949
                strict=False
950
            )
951

952

953
def create_ssl_context_controller_service(
954
        parent_pg, name, keystore_file, keystore_password, truststore_file, truststore_password,
955
        key_password=None, keystore_type=None, truststore_type=None, ssl_protocol=None,
956
        ssl_service_type=None):
957
    """
958
    Creates and configures an SSL Context Service for secure client connections.
959
    Note that once created it can be listed and deleted using the standard canvas functions.
960

961
    Args:
962
        parent_pg (ProcessGroupEntity): The Process Group to create the service in
963
        name (str): Name for the SSL Context Service
964
        keystore_file (str): Path to the client certificate/keystore file
965
        keystore_password (str): Password for the keystore
966
        truststore_file (str): Path to the truststore file
967
        truststore_password (str): Password for the truststore
968
        key_password (Optional[str]): Password for the key, defaults to keystore_password if not set
969
        keystore_type (Optional[str]): Type of keystore (JKS, PKCS12), defaults to JKS
970
        truststore_type (Optional[str]): Type of truststore (JKS, PKCS12), defaults to JKS
971
        ssl_protocol (Optional[str]): SSL protocol version, defaults to TLS
972
        ssl_service_type (Optional[str]): SSL service type, defaults to
973
            StandardRestrictedSSLContextService
974

975
    Returns:
976
        (ControllerServiceEntity): The configured SSL Context Service
977
    """
978
    assert isinstance(parent_pg, nipyapi.nifi.ProcessGroupEntity)
979
    assert isinstance(name, six.string_types)
980
    assert isinstance(keystore_file, six.string_types)
981
    assert isinstance(keystore_password, six.string_types)
982
    assert isinstance(truststore_file, six.string_types)
983
    assert isinstance(truststore_password, six.string_types)
984
    assert key_password is None or isinstance(key_password, six.string_types)
985
    assert keystore_type is None or isinstance(keystore_type, six.string_types)
986
    assert truststore_type is None or isinstance(truststore_type, six.string_types)
987
    assert ssl_protocol is None or isinstance(ssl_protocol, six.string_types)
988

989
    default_ssl_service_type = 'org.apache.nifi.ssl.StandardRestrictedSSLContextService'
990
    with nipyapi.utils.rest_exceptions():
991
        return nipyapi.nifi.ControllerApi().create_controller_service(
992
            body=nipyapi.nifi.ControllerServiceEntity(
993
                revision=nipyapi.nifi.RevisionDTO(
994
                    version=0
995
                ),
996
                component=nipyapi.nifi.ControllerServiceDTO(
997
                    type=ssl_service_type or default_ssl_service_type,
998
                    name=name,
999
                    properties={
1000
                        'Keystore Filename': keystore_file,
1001
                        'Keystore Password': keystore_password,
1002
                        'key-password': key_password or keystore_password,
1003
                        'Keystore Type': keystore_type or 'JKS',
1004
                        'Truststore Filename': truststore_file,
1005
                        'Truststore Password': truststore_password,
1006
                        'Truststore Type': truststore_type or 'JKS',
1007
                        'SSL Protocol': ssl_protocol or 'TLS'
1008
                    }
1009
                )
1010
            )
1011
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc