• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Chaffelson / nipyapi / #2

25 Mar 2025 10:32AM CUT coverage: 69.867% (+69.9%) from 0.0%
#2

push

coveralls-python

web-flow
V022release (#384)

* Update history for release

* Bump version: 0.21.0 → 0.22.0

1 of 1 new or added line in 1 file covered. (100.0%)

1048 of 1500 relevant lines covered (69.87%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.35
/nipyapi/versioning.py
1
# -*- coding: utf-8 -*-
2

3
"""
4
For interactions with the NiFi Registry Service and related functions
5
"""
6

7
from __future__ import absolute_import
1✔
8
import logging
1✔
9
import six
1✔
10
from future.utils import raise_from as _raise
1✔
11
import nipyapi
1✔
12
# Due to line lengths, creating shortened names for these objects
13
from nipyapi.nifi import VersionControlInformationDTO as VciDTO
1✔
14
from nipyapi.registry import VersionedFlowSnapshotMetadata as VfsMd
1✔
15

16
__all__ = [
1✔
17
    'create_registry_client', 'list_registry_clients',
18
    'delete_registry_client', 'get_registry_client', 'list_registry_buckets',
19
    'create_registry_bucket', 'delete_registry_bucket', 'get_registry_bucket',
20
    'save_flow_ver', 'list_flows_in_bucket', 'get_flow_in_bucket',
21
    'get_latest_flow_ver', 'update_flow_ver', 'get_version_info',
22
    'create_flow', 'create_flow_version', 'get_flow_version',
23
    'export_flow_version', 'import_flow_version', 'list_flow_versions',
24
    'deploy_flow_version'
25
]
26

27
log = logging.getLogger(__name__)
1✔
28

29

30
def create_registry_client(name, uri, description, reg_type=None, ssl_context_service=None):
1✔
31
    """
32
    Creates a Registry Client in the NiFi Controller Services
33

34
    Args:
35
        name (str): The name of the new Client
36
        uri (str): The URI for the connection
37
        description (str): A description for the Client
38
        reg_type (str): The type of registry client to create
39
        ssl_context_service (ControllerServiceEntity): Optional SSL Context Service
40

41
    Returns:
42
        (FlowRegistryClientEntity): The new registry client object
43
    """
44
    assert isinstance(uri, six.string_types) and uri is not False
1✔
45
    assert isinstance(name, six.string_types) and name is not False
1✔
46
    assert isinstance(description, six.string_types)
1✔
47

48
    if nipyapi.utils.check_version('2', service='nifi') == 1:
1✔
49
        component = {
1✔
50
            'uri': uri,
51
            'name': name,
52
            'description': description
53
        }
54
    else:
55
        component = {
×
56
            'name': name,
57
            'description': description,
58
            'type': reg_type or 'org.apache.nifi.registry.flow.NiFiRegistryFlowRegistryClient',
59
            'properties': {
60
                'url': uri
61
            }
62
        }
63

64
    with nipyapi.utils.rest_exceptions():
1✔
65
        controller = nipyapi.nifi.ControllerApi().create_flow_registry_client(
1✔
66
            body={
67
                'component': component,
68
                'revision': {
69
                    'version': 0
70
                }
71
            }
72
        )
73

74
    # Update with SSL context if provided
75
    # Will be ignored in 1.x if set in original creation request
76
    if ssl_context_service:
1✔
77
        update_component = dict(controller.component.to_dict())
×
78
        update_component['properties'] = {
×
79
            'url': uri,
80
            'ssl-context-service': ssl_context_service.id
81
        }
82

83
        with nipyapi.utils.rest_exceptions():
×
84
            controller = nipyapi.nifi.ControllerApi().update_flow_registry_client(
×
85
                id=controller.id,
86
                body={
87
                    'component': update_component,
88
                    'revision': {
89
                        'version': controller.revision.version
90
                    }
91
                }
92
            )
93

94
    return controller
1✔
95

96

97
def delete_registry_client(client, refresh=True):
1✔
98
    """
99
    Deletes a Registry Client from the list of NiFI Controller Services
100

101
    Args:
102
        client (FlowRegistryClientEntity): The client to delete
103
        refresh (bool): Whether to refresh the object before action
104

105
    Returns:
106
        (FlowRegistryClientEntity): The updated client object
107
    """
108
    assert isinstance(client, nipyapi.nifi.FlowRegistryClientEntity)
1✔
109
    with nipyapi.utils.rest_exceptions():
1✔
110
        if refresh:
1✔
111
            target = nipyapi.nifi.ControllerApi().get_flow_registry_client(
1✔
112
                client.id
113
            )
114
        else:
115
            target = client
×
116
        return nipyapi.nifi.ControllerApi().delete_flow_registry_client(
1✔
117
            id=target.id,
118
            version=target.revision.version
119
        )
120

121

122
def list_registry_clients():
1✔
123
    """
124
    Lists the available Registry Clients in the NiFi Controller Services
125

126
    Returns:
127
        (list[FlowRegistryClientEntity]) objects
128
    """
129
    with nipyapi.utils.rest_exceptions():
1✔
130
        return nipyapi.nifi.ControllerApi().get_flow_registry_clients()
1✔
131

132

133
def get_registry_client(identifier, identifier_type='name'):
1✔
134
    """
135
    Filters the Registry clients to a particular identifier
136

137
    Args:
138
        identifier (str): the filter string
139
        identifier_type (str): the parameter to filter on
140

141
    Returns:
142
        None for no matches, Single Object for unique match,
143
        list(Objects) for multiple matches
144
    """
145
    with nipyapi.utils.rest_exceptions():
1✔
146
        obj = list_registry_clients().registries
1✔
147
    return nipyapi.utils.filter_obj(obj, identifier, identifier_type)
1✔
148

149

150
def list_registry_buckets():
1✔
151
    """
152
    Lists all available Buckets in the NiFi Registry
153

154
    Returns:
155
        (list[Bucket]) objects
156
    """
157
    with nipyapi.utils.rest_exceptions():
1✔
158
        return nipyapi.registry.BucketsApi().get_buckets()
1✔
159

160

161
def create_registry_bucket(name):
1✔
162
    """
163
    Creates a new Registry Bucket
164

165
    Args:
166
        name (str): name for the bucket, must be unique in the Registry
167

168
    Returns:
169
        (Bucket): The new Bucket object
170
    """
171
    with nipyapi.utils.rest_exceptions():
1✔
172
        bucket = nipyapi.registry.BucketsApi().create_bucket(
1✔
173
            body={
174
                'name': name
175
            }
176
        )
177
        log.debug("Created bucket %s against registry connection at %s",
1✔
178
                  bucket.identifier,
179
                  nipyapi.config.registry_config.api_client.host)
180
        return bucket
1✔
181

182

183
def delete_registry_bucket(bucket):
1✔
184
    """
185
    Removes a bucket from the NiFi Registry
186

187
    Args:
188
        bucket (Bucket): the Bucket object to remove
189

190
    Returns:
191
        (Bucket): The updated Bucket object
192
    """
193
    try:
1✔
194
        return nipyapi.registry.BucketsApi().delete_bucket(
1✔
195
            version=bucket.revision.version
196
            if bucket.revision is not None else 0,
197
            bucket_id=bucket.identifier
198
        )
199
    except (nipyapi.registry.rest.ApiException, AttributeError) as e:
1✔
200
        _raise(ValueError(e), e)
1✔
201

202

203
def get_registry_bucket(identifier, identifier_type='name', greedy=True):
1✔
204
    """
205
    Filters the Bucket list to a particular identifier
206

207
    Args:
208
        identifier (str): the filter string
209
        identifier_type (str): the param to filter on
210
        greedy (bool): False for exact match, True for greedy match
211

212
    Returns:
213
        None for no matches, Single Object for unique match,
214
        list(Objects) for multiple matches
215
    """
216
    with nipyapi.utils.rest_exceptions():
1✔
217
        obj = list_registry_buckets()
1✔
218
    return nipyapi.utils.filter_obj(
1✔
219
        obj, identifier, identifier_type, greedy=greedy)
220

221

222
def list_flows_in_bucket(bucket_id):
1✔
223
    """
224
    List of all Flows in a given NiFi Registry Bucket
225

226
    Args:
227
        bucket_id (str): The UUID of the Bucket to fetch from
228

229
    Returns:
230
        (list[VersionedFlow]) objects
231
    """
232
    with nipyapi.utils.rest_exceptions():
1✔
233
        return nipyapi.registry.BucketFlowsApi().get_flows(bucket_id)
1✔
234

235

236
def get_flow_in_bucket(bucket_id, identifier, identifier_type='name',
1✔
237
                       greedy=True):
238
    """
239
    Filters the Flows in a Bucket against a particular identifier
240

241
    Args:
242
        bucket_id (str): UUID of the Bucket to filter against
243
        identifier (str): The string to filter on
244
        identifier_type (str): The param to check
245
        greedy (bool): False for exact match, True for greedy match
246

247
    Returns:
248
        None for no matches, Single Object for unique match,
249
        list(Objects) for multiple matches
250
    """
251
    with nipyapi.utils.rest_exceptions():
1✔
252
        obj = list_flows_in_bucket(bucket_id)
1✔
253
    return nipyapi.utils.filter_obj(
1✔
254
        obj, identifier, identifier_type, greedy=greedy)
255

256

257
# pylint: disable=R0913,R0917
258
def save_flow_ver(process_group, registry_client, bucket, flow_name=None,
1✔
259
                  flow_id=None, comment='', desc='', refresh=True,
260
                  force=False):
261
    """
262
    Adds a Process Group into NiFi Registry Version Control, or saves a new
263
    version to an existing VersionedFlow with a new version
264

265
    Args:
266
        process_group (ProcessGroupEntity): the ProcessGroup object to save
267
            as a new Flow Version
268
        registry_client (RegistryClient): The Client linked to the Registry
269
            which contains the Bucket to save to
270
        bucket (Bucket): the Bucket on the NiFi Registry to save to
271
        flow_name (str): A name for the VersionedFlow in the Bucket
272
            Note you need either a name for a new VersionedFlow, or the ID of
273
            an existing one to save a new version
274
        flow_id (Optional [str]): Identifier of an existing VersionedFlow in
275
            the bucket, if saving a new version to an existing flow
276
        comment (str): A comment for the version commit
277
        desc (str): A description of the VersionedFlow
278
        refresh (bool): Whether to refresh the object revisions before action
279
        force (bool): Whether to Force Commit, or just regular Commit
280

281
    Returns:
282
        (VersionControlInformationEntity)
283
    """
284
    if refresh:
1✔
285
        target_pg = nipyapi.canvas.get_process_group(process_group.id, 'id')
1✔
286
    else:
287
        target_pg = process_group
1✔
288
    flow_dto = nipyapi.nifi.VersionedFlowDTO(
1✔
289
        bucket_id=bucket.identifier,
290
        comments=comment,
291
        description=desc,
292
        flow_name=flow_name,
293
        flow_id=flow_id,
294
        registry_id=registry_client.id
295
    )
296
    if nipyapi.utils.check_version('1.10.0') <= 0:
1✔
297
        # no 'action' property in versions < 1.10
298
        flow_dto.action = 'FORCE_COMMIT' if force else 'COMMIT'
1✔
299
    with nipyapi.utils.rest_exceptions():
1✔
300
        nipyapi.utils.validate_parameters_versioning_support()
1✔
301
        return nipyapi.nifi.VersionsApi().save_to_flow_registry(
1✔
302
            id=target_pg.id,
303
            body=nipyapi.nifi.StartVersionControlRequestEntity(
304
                process_group_revision=target_pg.revision,
305
                versioned_flow=flow_dto
306
            )
307
        )
308

309

310
def stop_flow_ver(process_group, refresh=True):
1✔
311
    """
312
    Removes a Process Group from Version Control
313

314
    Args:
315
        process_group (ProcessGroupEntity): the ProcessGroup to work with
316
        refresh (bool): Whether to refresh the object status before actioning
317

318
    Returns:
319
        (VersionControlInformationEntity)
320
    """
321
    with nipyapi.utils.rest_exceptions():
1✔
322
        if refresh:
1✔
323
            target_pg = nipyapi.canvas.get_process_group(
1✔
324
                process_group.id, 'id'
325
            )
326
        else:
327
            target_pg = process_group
1✔
328
        return nipyapi.nifi.VersionsApi().stop_version_control(
1✔
329
            id=target_pg.id,
330
            version=target_pg.revision.version
331
        )
332

333

334
def revert_flow_ver(process_group):
1✔
335
    """
336
    Attempts to roll back uncommitted changes to a Process Group to the last
337
    committed version
338

339
    Args:
340
        process_group (ProcessGroupEntity): the ProcessGroup to work with
341

342
    Returns:
343
        (VersionedFlowUpdateRequestEntity)
344
    """
345
    assert isinstance(process_group, nipyapi.nifi.ProcessGroupEntity)
1✔
346
    with nipyapi.utils.rest_exceptions():
1✔
347
        return nipyapi.nifi.VersionsApi().initiate_revert_flow_version(
1✔
348
            id=process_group.id,
349
            body=nipyapi.nifi.VersionsApi().get_version_information(
350
                process_group.id
351
            )
352
        )
353

354

355
def list_flow_versions(bucket_id, flow_id, registry_id=None,
1✔
356
                       service='registry'):
357
    """
358
    EXPERIMENTAL
359
    List all the versions of a given Flow in a given Bucket
360

361
    Args:
362
        bucket_id (str): UUID of the bucket holding the flow to be enumerated
363
        flow_id (str): UUID of the flow in the bucket to be enumerated
364
        registry_id (str): UUID of the registry client linking the bucket, only
365
            required if requesting flows via NiFi instead of directly Registry
366
        service (str): Accepts 'nifi' or 'registry', indicating which service
367
            to query
368

369
    Returns:
370
        list(VersionedFlowSnapshotMetadata) or
371
            (VersionedFlowSnapshotMetadataSetEntity)
372
    """
373
    assert service in ['nifi', 'registry']
1✔
374
    if service == 'nifi':
1✔
375
        with nipyapi.utils.rest_exceptions():
1✔
376
            return nipyapi.nifi.FlowApi().get_versions(
1✔
377
                registry_id=registry_id,
378
                bucket_id=bucket_id,
379
                flow_id=flow_id
380
            )
381
    else:
382
        with nipyapi.utils.rest_exceptions():
×
383
            return nipyapi.registry.BucketFlowsApi().get_flow_versions(
×
384
                bucket_id=bucket_id,
385
                flow_id=flow_id
386
            )
387

388

389
def update_flow_ver(process_group, target_version=None):
1✔
390
    """
391
    Changes a versioned flow to the specified version, or the latest version
392

393
    Args:
394
        process_group (ProcessGroupEntity): ProcessGroupEntity under version
395
            control to change
396
        target_version (Optional [None, Int]): Either None to move to the
397
        latest available version, or Int of the version number to move to
398

399
    Returns:
400
        (bool): True if successful, False if not
401
    """
402
    def _running_update_flow_version():
×
403
        """
404
        Tests for completion of the operation
405

406
        Returns:
407
            (bool) Boolean of operation success
408
        """
409
        status = nipyapi.nifi.VersionsApi().get_update_request(
×
410
            u_init.request.request_id
411
        )
412
        if not status.request.complete:
×
413
            return False
×
414
        if status.request.failure_reason is None:
×
415
            return True
×
416
        raise ValueError(
×
417
            "Flow Version Update did not complete successfully. "
418
            "Error text {0}".format(status.request.failure_reason)
419
        )
420
    with nipyapi.utils.rest_exceptions():
×
421
        vci = get_version_info(process_group)
×
422
        assert isinstance(vci, nipyapi.nifi.VersionControlInformationEntity)
×
423
        flow_vers = list_flow_versions(
×
424
            vci.version_control_information.bucket_id,
425
            vci.version_control_information.flow_id
426
        )
427
        if target_version is None:
×
428
            # the first version is always the latest available
429
            ver = flow_vers[0].version
×
430
        else:
431
            # otherwise the version must be an int
432
            if not isinstance(target_version, int):
×
433
                raise ValueError("target_version must be a positive Integer to"
×
434
                                 " pick a specific available version, or None"
435
                                 " for the latest version to be fetched")
436
            ver = target_version
×
437
        u_init = nipyapi.nifi.VersionsApi().initiate_version_control_update(
×
438
            id=process_group.id,
439
            body=nipyapi.nifi.VersionControlInformationEntity(
440
                process_group_revision=vci.process_group_revision,
441
                version_control_information=VciDTO(
442
                    bucket_id=vci.version_control_information.bucket_id,
443
                    flow_id=vci.version_control_information.flow_id,
444
                    group_id=vci.version_control_information.group_id,
445
                    registry_id=vci.version_control_information.registry_id,
446
                    version=ver
447
                )
448
            )
449
        )
450
        nipyapi.utils.wait_to_complete(_running_update_flow_version)
×
451
        return nipyapi.nifi.VersionsApi().get_update_request(
×
452
            u_init.request.request_id
453
        )
454

455

456
def get_latest_flow_ver(bucket_id, flow_id):
1✔
457
    """
458
    Gets the most recent version of a VersionedFlowSnapshot from a bucket
459

460
    Args:
461
        bucket_id (str): the UUID of the Bucket containing the flow
462
        flow_id (str): the UUID of the VersionedFlow to be retrieved
463

464
    Returns:
465
        (VersionedFlowSnapshot)
466
    """
467
    with nipyapi.utils.rest_exceptions():
1✔
468
        return get_flow_version(
1✔
469
            bucket_id, flow_id, version=None
470
        )
471

472

473
def get_version_info(process_group):
1✔
474
    """
475
    Gets the Version Control information for a particular Process Group
476

477
    Args:
478
        process_group (ProcessGroupEntity): the ProcessGroup to work with
479

480
    Returns:
481
        (VersionControlInformationEntity)
482
    """
483
    assert isinstance(process_group, nipyapi.nifi.ProcessGroupEntity)
1✔
484
    with nipyapi.utils.rest_exceptions():
1✔
485
        return nipyapi.nifi.VersionsApi().get_version_information(
1✔
486
            process_group.id
487
        )
488

489

490
def create_flow(bucket_id, flow_name, flow_desc='', flow_type='Flow'):
1✔
491
    """
492
    Creates a new VersionedFlow stub in NiFi Registry.
493
    Can be used to write VersionedFlow information to without using a NiFi
494
    Process Group directly
495

496
    Args:
497
        bucket_id (str): UUID of the Bucket to write to
498
        flow_name (str): Name for the new VersionedFlow object
499
        flow_desc (Optional [str]): Description for the new VersionedFlow
500
            object
501
        flow_type (Optional [str]): Type of the VersionedFlow, should be 'Flow'
502

503
    Returns:
504
        (VersionedFlow)
505
    """
506
    with nipyapi.utils.rest_exceptions():
1✔
507
        return nipyapi.registry.BucketFlowsApi().create_flow(
1✔
508
            bucket_id=bucket_id,
509
            body=nipyapi.registry.VersionedFlow(
510
                name=flow_name,
511
                description=flow_desc,
512
                bucket_identifier=bucket_id,
513
                type=flow_type,
514
                version_count=0
515
            )
516
        )
517

518

519
def create_flow_version(flow, flow_snapshot, refresh=True):
1✔
520
    """
521
    EXPERIMENTAL
522

523
    Writes a FlowSnapshot into a VersionedFlow as a new version update
524

525
    Note that this differs from save_flow_ver which creates a new Flow Version
526
    containing the snapshot. This function writes a snapshot to an existing
527
    Flow Version. Useful in migrating Flow Versions between environments.
528

529
    Args:
530
        flow (VersionedFlowObject): the VersionedFlow object to write to
531
        flow_snapshot (VersionedFlowSnapshot): the Snapshot to write into the
532
            VersionedFlow
533
        refresh (bool): Whether to refresh the object status before actioning
534

535
    Returns:
536
        The new (VersionedFlowSnapshot)
537
    """
538
    if not isinstance(flow_snapshot, nipyapi.registry.VersionedFlowSnapshot):
1✔
539
        raise ValueError("flow_snapshot must be an instance of a "
×
540
                         "registry.VersionedFlowSnapshot object, not an {0}"
541
                         .format(type(flow_snapshot)))
542
    with nipyapi.utils.rest_exceptions():
1✔
543
        if refresh:
1✔
544
            target_flow = get_flow_in_bucket(
1✔
545
                bucket_id=flow.bucket_identifier,
546
                identifier=flow.identifier,
547
                identifier_type='id'
548
            )
549
        else:
550
            target_flow = flow
×
551
        target_bucket = get_registry_bucket(
1✔
552
            target_flow.bucket_identifier, 'id'
553
        )
554
        # The current version of NiFi doesn't ignore link objects passed to it
555
        bad_params = ['link']
1✔
556
        for obj in [target_bucket, target_flow]:
1✔
557
            for p in bad_params:
1✔
558
                setattr(obj, p, None)
1✔
559
        nipyapi.utils.validate_parameters_versioning_support(verify_nifi=False)
1✔
560
        ecs = flow_snapshot.external_controller_services
1✔
561
        return nipyapi.registry.BucketFlowsApi().create_flow_version(
1✔
562
            bucket_id=target_bucket.identifier,
563
            flow_id=target_flow.identifier,
564
            body=nipyapi.registry.VersionedFlowSnapshot(
565
                flow=target_flow,
566
                bucket=target_bucket,
567
                flow_contents=flow_snapshot.flow_contents,
568
                parameter_contexts=flow_snapshot.parameter_contexts,
569
                external_controller_services=ecs,
570
                snapshot_metadata=VfsMd(
571
                    version=target_flow.version_count + 1,
572
                    comments=flow_snapshot.snapshot_metadata.comments,
573
                    bucket_identifier=target_flow.bucket_identifier,
574
                    flow_identifier=target_flow.identifier
575
                ),
576
            )
577
        )
578

579

580
def get_flow_version(bucket_id, flow_id, version=None, export=False):
1✔
581
    """
582
    Retrieves the latest, or a specific, version of a Flow
583

584
    Args:
585
        bucket_id (str): the UUID of the bucket containing the Flow
586
        flow_id (str): the UUID of the Flow to be retrieved from the Bucket
587
        version (Optional [None, str]): 'None' to retrieve the latest version,
588
            or a version number as a string to get that version
589
        export (bool): True to get the raw json object from the server for
590
            export, False to get the native DataType
591

592
    Returns:
593
        (VersionedFlowSnapshot): If export=False, or the raw json otherwise
594

595
    WARNING: This call is impacted by
596
    https://issues.apache.org/jira/browse/NIFIREG-135
597
    Which means you sometimes can't trust the version count
598
    """
599
    assert isinstance(bucket_id, six.string_types)
1✔
600
    assert isinstance(flow_id, six.string_types)
1✔
601
    # Version needs to be coerced to str pass API client regex test
602
    # Even though the client specifies it as Int
603
    assert version is None or isinstance(
1✔
604
        version, (six.string_types, six.integer_types)
605
    )
606
    assert isinstance(export, bool)
1✔
607
    if version:
1✔
608
        with nipyapi.utils.rest_exceptions():
×
609
            out = nipyapi.registry.BucketFlowsApi().get_flow_version(
×
610
                bucket_id=bucket_id,
611
                flow_id=flow_id,
612
                version_number=str(version),  # This str coercion is intended
613
                _preload_content=not export
614
            )
615
    else:
616
        with nipyapi.utils.rest_exceptions():
1✔
617
            out = nipyapi.registry.BucketFlowsApi().get_latest_flow_version(
1✔
618
                bucket_id,
619
                flow_id,
620
                _preload_content=not export
621
            )
622
    if export:
1✔
623
        return out.data
1✔
624
    return out
1✔
625

626

627
def export_flow_version(bucket_id, flow_id, version=None, file_path=None,
1✔
628
                        mode='json'):
629
    """
630
    Convenience method to export the identified VersionedFlowSnapshot in the
631
    provided format mode.
632

633
    Args:
634
        bucket_id (str): the UUID of the bucket containing the Flow
635
        flow_id (str): the UUID of the Flow to be retrieved from the Bucket
636
        version (Optional [None, Str]): 'None' to retrieve the latest version,
637
            or a version number as a string to get that version
638
        file_path (str): The path and filename to write to. Defaults to None
639
            which returns the serialised obj
640
        mode (str): 'json' or 'yaml' to specific the encoding format
641

642
    Returns:
643
        (str) of the encoded Snapshot
644
    """
645
    assert isinstance(bucket_id, six.string_types)
1✔
646
    assert isinstance(flow_id, six.string_types)
1✔
647
    assert file_path is None or isinstance(file_path, six.string_types)
1✔
648
    assert version is None or isinstance(version, six.string_types)
1✔
649
    assert mode in ['yaml', 'json']
1✔
650
    raw_obj = get_flow_version(bucket_id, flow_id, version, export=True)
1✔
651
    export_obj = nipyapi.utils.dump(nipyapi.utils.load(raw_obj), mode)
1✔
652
    if file_path:
1✔
653
        return nipyapi.utils.fs_write(
1✔
654
            obj=export_obj,
655
            file_path=file_path,
656
        )
657
    return export_obj
1✔
658

659

660
def import_flow_version(bucket_id, encoded_flow=None, file_path=None,
1✔
661
                        flow_name=None, flow_id=None):
662
    """
663
    Imports a given encoded_flow version into the bucket and flow described,
664
    may optionally be passed a file to read the encoded flow_contents from.
665

666
    Note that only one of encoded_flow or file_path, and only one of flow_name
667
    or flow_id should be specified.
668

669
    Args:
670
        bucket_id (str): UUID of the bucket to write the encoded_flow version
671
        encoded_flow (Optional [str]): The encoded flow to import; if not
672
            specified file_path is read from.
673
        file_path (Optional [str]): The file path to read the encoded flow from
674
            , if not specified encoded_flow is read from.
675
        flow_name (Optional [str]): If this is to be the first version in a new
676
            flow object, then this is the String name for the flow object.
677
        flow_id (Optional [str]): If this is a new version for an existing flow
678
            object, then this is the ID of that object.
679

680
    Returns:
681
        The new (VersionedFlowSnapshot)
682
    """
683
    # First, decode the flow snapshot contents
684
    dto = ('registry', 'VersionedFlowSnapshot')
1✔
685
    if file_path is None and encoded_flow is not None:
1✔
686
        with nipyapi.utils.rest_exceptions():
1✔
687
            imported_flow = nipyapi.utils.load(
1✔
688
                encoded_flow,
689
                dto=dto
690
            )
691
    elif file_path is not None and encoded_flow is None:
1✔
692
        with nipyapi.utils.rest_exceptions():
1✔
693
            file_in = nipyapi.utils.fs_read(
1✔
694
                file_path=file_path
695
            )
696
            assert isinstance(file_in, (six.string_types, bytes))
1✔
697
            imported_flow = nipyapi.utils.load(
1✔
698
                obj=file_in,
699
                dto=dto
700
            )
701
            assert isinstance(
1✔
702
                imported_flow,
703
                nipyapi.registry.VersionedFlowSnapshot
704
            )
705
    else:
706
        raise ValueError("Either file_path must point to a file for import, or"
×
707
                         " flow_snapshot must be an importable object, but"
708
                         "not both")
709
    # Now handle determining which Versioned Item to write to
710
    if flow_id is None and flow_name is not None:
1✔
711
        # Case: New flow
712
        # create the Bucket item
713
        ver_flow = create_flow(
1✔
714
            bucket_id=bucket_id,
715
            flow_name=flow_name
716
        )
717
    elif flow_name is None and flow_id is not None:
1✔
718
        # Case: New version in existing flow
719
        ver_flow = get_flow_in_bucket(
1✔
720
            bucket_id=bucket_id,
721
            identifier=flow_id,
722
            identifier_type='id'
723
        )
724
    else:
725
        raise ValueError("Either flow_id must be the identifier of a flow to"
×
726
                         " add this version to, or flow_name must be a unique "
727
                         "name for a flow in this bucket, but not both")
728
    # Now write the new version
729
    nipyapi.utils.validate_parameters_versioning_support(verify_nifi=False)
1✔
730
    return create_flow_version(
1✔
731
        flow=ver_flow,
732
        flow_snapshot=imported_flow,
733
    )
734

735

736
# pylint: disable=R0913, R0917
737
def deploy_flow_version(parent_id, location, bucket_id, flow_id, reg_client_id,
1✔
738
                        version=None):
739
    """
740
    Deploys a versioned flow as a new process group inside the given parent
741
    process group. If version is not provided, the latest version will be
742
    deployed.
743

744
    Args:
745
        parent_id (str): The ID of the parent Process Group to create the
746
            new process group in.
747
        location (tuple[x, y]): the x,y coordinates to place the new Process
748
            Group under the parent
749
        bucket_id (str): ID of the bucket containing the versioned flow to
750
            deploy.
751
        reg_client_id (str): ID of the registry client connection to use.
752
        flow_id (str): ID of the versioned flow to deploy.
753
        version (Optional [int,str]): version to deploy, if not provided latest
754
            version will be deployed.
755

756
    Returns:
757
        (ProcessGroupEntity) of the newly deployed Process Group
758
    """
759
    # Default location to (0, 0) if not provided per Issue #342
760
    location = location or (0, 0)
1✔
761
    assert isinstance(location, tuple)
1✔
762
    # check reg client is valid
763
    target_reg_client = get_registry_client(reg_client_id, 'id')
1✔
764
    # Being pedantic about checking this as API failure errors are terse
765
    # Using NiFi here to keep all calls within the same API client
766
    flow_versions = list_flow_versions(
1✔
767
        bucket_id=bucket_id,
768
        flow_id=flow_id,
769
        registry_id=reg_client_id,
770
        service='nifi'
771
    )
772
    if not flow_versions:
1✔
773
        raise ValueError("Could not find Flows matching Bucket ID [{0}] and "
×
774
                         "Flow ID [{1}] on Registry Client [{2}]"
775
                         .format(bucket_id, flow_id, reg_client_id))
776
    if version is None:
1✔
777
        target_flow = flow_versions.versioned_flow_snapshot_metadata_set
1✔
778
    else:
779
        target_flow = [
1✔
780
            x for x in flow_versions.versioned_flow_snapshot_metadata_set
781
            if str(x.versioned_flow_snapshot_metadata.version) == str(version)
782
        ]
783
    if not target_flow:
1✔
784
        available_versions = [
×
785
            str(x.versioned_flow_snapshot_metadata.version)
786
            for x in flow_versions.versioned_flow_snapshot_metadata_set
787
        ]
788
        raise ValueError(
×
789
            "Could not find Version [{0}] for Flow [{1}] in Bucket [{2}] on "
790
            "Registry Client [{3}]. Available versions are: {4}"
791
            .format(str(version), flow_id, bucket_id, reg_client_id,
792
                    ", ".join(available_versions))
793
        )
794
    target_flow = sorted(
1✔
795
        target_flow,
796
        key=lambda x: x.versioned_flow_snapshot_metadata.version,
797
        reverse=True
798
    )[0].versioned_flow_snapshot_metadata
799
    # Issue deploy statement
800
    with nipyapi.utils.rest_exceptions():
1✔
801
        return nipyapi.nifi.ProcessGroupsApi().create_process_group(
1✔
802
            id=parent_id,
803
            body=nipyapi.nifi.ProcessGroupEntity(
804
                revision=nipyapi.nifi.RevisionDTO(
805
                    version=0
806
                ),
807
                component=nipyapi.nifi.ProcessGroupDTO(
808
                    position=nipyapi.nifi.PositionDTO(
809
                        x=float(location[0]),
810
                        y=float(location[1])
811
                    ),
812
                    version_control_information=VciDTO(
813
                        bucket_id=target_flow.bucket_identifier,
814
                        flow_id=target_flow.flow_identifier,
815
                        registry_id=target_reg_client.id,
816
                        version=target_flow.version
817
                    )
818
                )
819
            )
820
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc