• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Chaffelson / nipyapi / #2

25 Mar 2025 10:32AM CUT coverage: 69.867% (+69.9%) from 0.0%
#2

push

coveralls-python

web-flow
V022release (#384)

* Update history for release

* Bump version: 0.21.0 → 0.22.0

1 of 1 new or added line in 1 file covered. (100.0%)

1048 of 1500 relevant lines covered (69.87%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.04
/nipyapi/canvas.py
1
# -*- coding: utf-8 -*-
2
# pylint: disable=C0302
3

4
"""
5
For interactions with the NiFi Canvas.
6
"""
7

8
from __future__ import absolute_import
1✔
9
import logging
1✔
10
import six
1✔
11
from future.utils import raise_from as _raise
1✔
12
import nipyapi
1✔
13
from nipyapi.utils import exception_handler
1✔
14

15
__all__ = [
1✔
16
    "get_root_pg_id", "recurse_flow", "get_flow", "get_process_group_status",
17
    "get_process_group", "list_all_process_groups", "delete_process_group",
18
    "schedule_process_group", "create_process_group", "list_all_processors",
19
    "list_all_processor_types", "get_processor_type", 'create_processor',
20
    'delete_processor', 'get_processor', 'schedule_processor', 'get_funnel',
21
    'update_processor', 'get_variable_registry', 'update_variable_registry',
22
    'purge_connection', 'purge_process_group', 'schedule_components',
23
    'get_bulletins', 'get_bulletin_board', 'list_invalid_processors',
24
    'list_sensitive_processors', 'list_all_connections', 'create_connection',
25
    'delete_connection', 'get_component_connections', 'create_controller',
26
    'list_all_controllers', 'delete_controller', 'update_controller',
27
    'schedule_controller', 'get_controller', 'list_all_controller_types',
28
    'list_all_by_kind', 'list_all_input_ports', 'list_all_output_ports',
29
    'list_all_funnels', 'list_all_remote_process_groups', 'delete_funnel',
30
    'get_remote_process_group', 'update_process_group', 'create_funnel',
31
    'create_remote_process_group', 'delete_remote_process_group',
32
    'set_remote_process_group_transmission', 'get_pg_parents_ids',
33
    'delete_port', 'create_port'
34
]
35

36
log = logging.getLogger(__name__)
1✔
37

38

39
def get_root_pg_id():
1✔
40
    """
41
    Convenience function to return the UUID of the Root Process Group
42

43
    Returns (str): The UUID of the root PG
44
    """
45
    return nipyapi.nifi.FlowApi().get_process_group_status('root') \
1✔
46
        .process_group_status.id
47

48

49
def recurse_flow(pg_id='root'):
1✔
50
    """
51
    Returns information about a Process Group and all its Child Flows.
52
    Recurses the child flows by appending each process group with a
53
    'nipyapi_extended' parameter which contains the child process groups, etc.
54
    Note: This previously used actual recursion which broke on large NiFi
55
        environments, we now use a task/list update approach
56

57
    Args:
58
        pg_id (str): The Process Group UUID
59

60
    Returns:
61
         (ProcessGroupFlowEntity): enriched NiFi Flow object
62
    """
63
    assert isinstance(pg_id, six.string_types), "pg_id should be a string"
1✔
64

65
    out = get_flow(pg_id)
1✔
66
    tasks = [(x.id, x) for x in out.process_group_flow.flow.process_groups]
1✔
67
    while tasks:
1✔
68
        this_pg_id, this_parent_obj = tasks.pop()
1✔
69
        this_flow = get_flow(this_pg_id)
1✔
70
        setattr(this_parent_obj, 'nipyapi_extended', this_flow)
1✔
71
        tasks += [(x.id, x) for x in
1✔
72
                  this_flow.process_group_flow.flow.process_groups]
73
    return out
1✔
74

75

76
def get_flow(pg_id='root'):
1✔
77
    """
78
    Returns information about a Process Group and flow.
79

80
    This surfaces the native implementation, for the recursed implementation
81
    see 'recurse_flow'
82

83
    Args:
84
        pg_id (str): id of the Process Group to retrieve, defaults to the root
85
            process group if not set
86

87
    Returns:
88
         (ProcessGroupFlowEntity): The Process Group object
89
    """
90
    assert isinstance(pg_id, six.string_types), "pg_id should be a string"
1✔
91
    with nipyapi.utils.rest_exceptions():
1✔
92
        return nipyapi.nifi.FlowApi().get_flow(pg_id)
1✔
93

94

95
def get_process_group_status(pg_id='root', detail='names'):
1✔
96
    """
97
    Returns an entity containing the status of the Process Group.
98
    Optionally may be configured to return a simple dict of name:id pairings
99

100
    Note that there is also a 'process group status' command available, but it
101
    returns a subset of this data anyway, and this call is more useful
102

103
    Args:
104
        pg_id (str): The UUID of the Process Group
105
        detail (str): 'names' or 'all'; whether to return a simple dict of
106
            name:id pairings, or the full details. Defaults to 'names'
107

108
    Returns:
109
         (ProcessGroupEntity): The Process Group Entity including the status
110
    """
111
    assert isinstance(pg_id, six.string_types), "pg_id should be a string"
1✔
112
    assert detail in ['names', 'all']
1✔
113
    raw = nipyapi.nifi.ProcessGroupsApi().get_process_group(id=pg_id)
1✔
114
    if detail == 'names':
1✔
115
        out = {
1✔
116
            raw.component.name: raw.component.id
117
        }
118
        return out
1✔
119
    return raw
1✔
120

121

122
@exception_handler(404, None)
1✔
123
def get_process_group(identifier, identifier_type='name', greedy=True):
1✔
124
    """
125
    Filters the list of all process groups against a given identifier string
126
    occurring in a given identifier_type field.
127

128
    Args:
129
        identifier (str): the string to filter the list for
130
        identifier_type (str): the field to filter on, set in config.py
131
        greedy (bool): True for partial match, False for exact match
132

133
    Returns:
134
        None for no matches, Single Object for unique match,
135
        list(Objects) for multiple matches
136

137
    """
138
    assert isinstance(identifier, six.string_types)
1✔
139
    assert identifier_type in ['name', 'id']
1✔
140
    with nipyapi.utils.rest_exceptions():
1✔
141
        if identifier_type == 'id' or identifier == 'root':
1✔
142
            # assuming unique fetch of pg id, 'root' is special case
143
            # implementing separately to avoid recursing entire canvas
144
            out = nipyapi.nifi.ProcessGroupsApi().get_process_group(identifier)
1✔
145
        else:
146
            obj = list_all_process_groups()
1✔
147
            out = nipyapi.utils.filter_obj(
1✔
148
                obj, identifier, identifier_type, greedy=greedy)
149
    return out
1✔
150

151

152
# pylint: disable=R1737
153
def list_all_process_groups(pg_id='root'):
1✔
154
    """
155
    Returns a flattened list of all Process Groups on the canvas.
156
    Potentially slow if you have a large canvas.
157

158
    Note that the ProcessGroupsApi().get_process_groups(pg_id) command only
159
    provides the first layer of pgs, whereas this trawls the entire canvas
160

161
    Args:
162
        pg_id (str): The UUID of the Process Group to start from, defaults to
163
            the Canvas root
164

165
    Returns:
166
         list[ProcessGroupEntity]
167

168
    """
169
    assert isinstance(pg_id, six.string_types), "pg_id should be a string"
1✔
170

171
    def flatten(parent_pg):
1✔
172
        """
173
        Recursively flattens the native datatypes into a generic list.
174
        Note that the root is a special case as it has no parent
175

176
        Args:
177
            parent_pg (ProcessGroupEntity): object to flatten
178

179
        Yields:
180
            Generator for all ProcessGroupEntities, eventually
181
        """
182
        for child_pg in parent_pg.process_group_flow.flow.process_groups:
1✔
183
            for item in flatten(child_pg.nipyapi_extended):
1✔
184
                yield item
1✔
185
            yield child_pg
1✔
186

187
    # Recurse children
188
    root_flow = recurse_flow(pg_id)
1✔
189
    # Flatten list of children with extended detail
190
    out = list(flatten(root_flow))
1✔
191
    # update parent with flattened list of extended child detail
192
    root_entity = get_process_group(pg_id, 'id')
1✔
193
    setattr(root_entity, 'nipyapi_extended', root_flow)
1✔
194
    out.append(root_entity)
1✔
195
    return out
1✔
196
    #
197
    # if pg_id == 'root' or pg_id == get_root_pg_id():
198
    #     # This duplicates the nipyapi_extended structure to the root case
199
    #     root_entity = get_process_group('root', 'id')
200
    #     root_entity.__setattr__('nipyapi_extended', root_flow)
201
    #     out.append(root_entity)
202
    # return out
203

204

205
def list_invalid_processors(pg_id='root', summary=False):
1✔
206
    """
207
    Returns a flattened list of all Processors with Invalid Statuses
208

209
    Args:
210
        pg_id (str): The UUID of the Process Group to start from, defaults to
211
            the Canvas root
212
        summary (bool): True to return just the list of relevant
213
            properties per Processor, False for the full listing
214

215
    Returns:
216
        list[ProcessorEntity]
217
    """
218
    assert isinstance(pg_id, six.string_types), "pg_id should be a string"
×
219
    assert isinstance(summary, bool)
×
220
    proc_list = [x for x in list_all_processors(pg_id)
×
221
                 if x.component.validation_errors]
222
    if summary:
×
223
        out = [{'id': x.id, 'summary': x.component.validation_errors}
×
224
               for x in proc_list]
225
    else:
226
        out = proc_list
×
227
    return out
×
228

229

230
def list_sensitive_processors(pg_id='root', summary=False):
1✔
231
    """
232
    Returns a flattened list of all Processors on the canvas which have
233
    sensitive properties that would need to be managed during deployment
234

235
    Args:
236
        pg_id (str): The UUID of the Process Group to start from, defaults to
237
            the Canvas root
238
        summary (bool): True to return just the list of relevant
239
            properties per Processor, False for the full listing
240

241
    Returns:
242
        list[ProcessorEntity] or list(dict)
243
    """
244
    assert isinstance(pg_id, six.string_types), "pg_id should be a string"
×
245
    assert isinstance(summary, bool)
×
246
    cache = nipyapi.config.cache.get('list_sensitive_processors')
×
247
    if not cache:
×
248
        cache = []
×
249
    matches = []
×
250
    proc_list = list_all_processors(pg_id)
×
251
    for proc in proc_list:
×
252
        if proc.component.type in cache:
×
253
            matches.append(proc)
×
254
        else:
255
            sensitive_test = False
×
256
            for _, detail in proc.component.config.descriptors.items():
×
257
                if detail.sensitive is True:
×
258
                    sensitive_test = True
×
259
                    break
×
260
            if sensitive_test:
×
261
                matches.append(proc)
×
262
                cache.append(str(proc.component.type))
×
263
    if cache:
×
264
        nipyapi.config.cache['list_sensitive_processors'] = cache
×
265
    if summary:
×
266
        return [
×
267
            {x.id: [
268
                p for p, q in x.component.config.descriptors.items()
269
                if q.sensitive is True]}
270
            for x in matches
271
        ]
272
    return matches
×
273

274

275
def list_all_processors(pg_id='root'):
1✔
276
    """
277
    Returns a flat list of all Processors under the provided Process Group
278

279
    Args:
280
        pg_id (str): The UUID of the Process Group to start from, defaults to
281
            the Canvas root
282

283
    Returns:
284
         list[ProcessorEntity]
285
    """
286
    assert isinstance(pg_id, six.string_types), "pg_id should be a string"
1✔
287

288
    if nipyapi.utils.check_version('1.7.0') <= 0:
1✔
289
        # Case where NiFi > 1.7.0
290
        targets = nipyapi.nifi.ProcessGroupsApi().get_processors(
1✔
291
            id=pg_id,
292
            include_descendant_groups=True
293
        )
294
        return targets.processors
1✔
295
    # Handle older NiFi instances
296
    out = []
×
297
    # list of child process groups
298
    pg_ids = [x.id for x in list_all_process_groups(pg_id)]
×
299
    # process target list
300
    for this_pg_id in pg_ids:
×
301
        procs = nipyapi.nifi.ProcessGroupsApi().get_processors(this_pg_id)
×
302
        if procs.processors:
×
303
            out += procs.processors
×
304
    return out
×
305

306

307
def schedule_process_group(process_group_id, scheduled):
1✔
308
    """
309
    Start or Stop a Process Group and all components.
310

311
    Note that this doesn't guarantee that all components have started, as
312
    some may be in Invalid states.
313

314
    Args:
315
        process_group_id (str): The UUID of the target Process Group
316
        scheduled (bool): True to start, False to stop
317

318
    Returns:
319
         (bool): True of successfully scheduled, False if not
320

321
    """
322
    assert isinstance(process_group_id, six.string_types)
1✔
323
    assert isinstance(scheduled, bool)
1✔
324

325
    def _running_schedule_process_group(pg_id_):
1✔
326
        test_obj = nipyapi.nifi.ProcessGroupsApi().get_process_group(pg_id_)
1✔
327
        if test_obj.status.aggregate_snapshot.active_thread_count == 0:
1✔
328
            return True
1✔
329
        return False
×
330

331
    assert isinstance(
1✔
332
        get_process_group(process_group_id, 'id'),
333
        nipyapi.nifi.ProcessGroupEntity
334
    )
335
    result = schedule_components(
1✔
336
        pg_id=process_group_id,
337
        scheduled=scheduled
338
    )
339
    # If target scheduled state was successfully updated
340
    if result:
1✔
341
        # If we want to stop the processor
342
        if not scheduled:
1✔
343
            # Test that the processor threads have halted
344
            stop_test = nipyapi.utils.wait_to_complete(
1✔
345
                _running_schedule_process_group,
346
                process_group_id
347
            )
348
            if stop_test:
1✔
349
                # Return True if we stopped the Process Group
350
                return result
1✔
351
            # Return False if we scheduled a stop, but it didn't stop
352
            return False
×
353
    # Return the True or False result if we were trying to start it
354
    return result
1✔
355

356

357
def delete_process_group(process_group, force=False, refresh=True):
1✔
358
    """
359
    Deletes a given Process Group, with optional prejudice.
360

361
    Args:
362
        process_group (ProcessGroupEntity): The target Process Group
363
        force (bool): Stop, purge and clean the target Process Group before
364
            deletion. Experimental.
365
        refresh (bool): Whether to refresh the state first
366

367
    Returns:
368
         (ProcessGroupEntity: The updated object state
369

370
    """
371
    assert isinstance(process_group, nipyapi.nifi.ProcessGroupEntity)
1✔
372
    assert isinstance(force, bool)
1✔
373
    assert isinstance(refresh, bool)
1✔
374
    pg_id = process_group.id
1✔
375
    if refresh or force:
1✔
376
        target = nipyapi.nifi.ProcessGroupsApi().get_process_group(pg_id)
1✔
377
    else:
378
        target = process_group
×
379
    try:
1✔
380
        return nipyapi.nifi.ProcessGroupsApi().remove_process_group(
1✔
381
            id=target.id,
382
            version=target.revision.version,
383
            client_id=target.revision.client_id
384
        )
385
    except nipyapi.nifi.rest.ApiException as e:
1✔
386
        if force:
1✔
387
            # Retrieve parent process group
388
            parent_pg_id = nipyapi.canvas.get_process_group(pg_id, 'id')\
1✔
389
                .component.parent_group_id
390
            # Stop, drop, and roll.
391
            purge_process_group(target, stop=True)
1✔
392
            # Remove inbound connections
393
            for con in list_all_connections(parent_pg_id):
1✔
394
                if pg_id in [con.destination_group_id, con.source_group_id]:
1✔
395
                    delete_connection(con)
1✔
396
            # Stop all Controller Services ONLY inside the PG
397
            controllers_list = list_all_controllers(pg_id)
1✔
398
            removed_controllers_id = []
1✔
399
            parent_pgs_id = get_pg_parents_ids(pg_id)
1✔
400
            for x in controllers_list:
1✔
401
                if x.component.id not in removed_controllers_id:
1✔
402
                    if x.component.parent_group_id not in parent_pgs_id:
1✔
403
                        delete_controller(x, True)
×
404
                        removed_controllers_id.append(x.component.id)
×
405

406
            # Templates are not supported in NiFi 2.x
407
            if nipyapi.utils.check_version('2', service='nifi') == 1:
1✔
408
                for template in nipyapi.templates.list_all_templates(native=False):
1✔
409
                    if target.id == template.template.group_id:
×
410
                        nipyapi.templates.delete_template(template.id)
×
411
            # have to refresh revision after changes
412
            target = nipyapi.nifi.ProcessGroupsApi().get_process_group(pg_id)
1✔
413
            return nipyapi.nifi.ProcessGroupsApi().remove_process_group(
1✔
414
                id=target.id,
415
                version=target.revision.version,
416
                client_id=target.revision.client_id
417
            )
418
        _raise(ValueError(e.body), e)
1✔
419

420

421
def create_process_group(parent_pg, new_pg_name, location, comment=''):
1✔
422
    """
423
    Creates a new Process Group with the given name under the provided parent
424
    Process Group at the given Location
425

426
    Args:
427
        parent_pg (ProcessGroupEntity): The parent Process Group to create the
428
            new process group in
429
        new_pg_name (str): The name of the new Process Group
430
        location (tuple[x, y]): the x,y coordinates to place the new Process
431
            Group under the parent
432
        comment (str): Entry for the Comments field
433

434
    Returns:
435
         (ProcessGroupEntity): The new Process Group
436

437
    """
438
    assert isinstance(parent_pg, nipyapi.nifi.ProcessGroupEntity)
1✔
439
    assert isinstance(new_pg_name, six.string_types)
1✔
440
    assert isinstance(location, tuple)
1✔
441
    with nipyapi.utils.rest_exceptions():
1✔
442
        return nipyapi.nifi.ProcessGroupsApi().create_process_group(
1✔
443
            id=parent_pg.id,
444
            body=nipyapi.nifi.ProcessGroupEntity(
445
                revision={'version': 0},
446
                component=nipyapi.nifi.ProcessGroupDTO(
447
                    name=new_pg_name,
448
                    position=nipyapi.nifi.PositionDTO(
449
                        x=float(location[0]),
450
                        y=float(location[1])
451
                    ),
452
                    comments=comment
453
                )
454
            )
455
        )
456

457

458
def list_all_processor_types():
1✔
459
    """
460
    Produces the list of all available processor types in the NiFi instance
461

462
    Returns:
463
         list(ProcessorTypesEntity): A native datatype containing the
464
         processors list
465

466
    """
467
    with nipyapi.utils.rest_exceptions():
1✔
468
        return nipyapi.nifi.FlowApi().get_processor_types()
1✔
469

470

471
def get_processor_type(identifier, identifier_type='name', greedy=True):
1✔
472
    """
473
    Gets the abstract object describing a Processor, or list thereof
474

475
    Args:
476
        identifier (str): the string to filter the list for
477
        identifier_type (str): the field to filter on, set in config.py
478
        greedy (bool): False for exact match, True for greedy match
479

480
    Returns:
481
        None for no matches, Single Object for unique match,
482
        list(Objects) for multiple matches
483

484
    """
485
    with nipyapi.utils.rest_exceptions():
1✔
486
        obj = list_all_processor_types().processor_types
1✔
487
    if obj:
1✔
488
        return nipyapi.utils.filter_obj(
1✔
489
            obj, identifier, identifier_type, greedy=greedy
490
        )
491
    return obj
×
492

493

494
def create_processor(parent_pg, processor, location, name=None, config=None):
1✔
495
    """
496
    Instantiates a given processor on the canvas
497

498
    Args:
499
        parent_pg (ProcessGroupEntity): The parent Process Group
500
        processor (DocumentedTypeDTO): The abstract processor type object to be
501
            instantiated
502
        location (tuple[x, y]): The location coordinates
503
        name (Optional [str]):  The name for the new Processor
504
        config (Optional [ProcessorConfigDTO]): A configuration object for the
505
            new processor
506

507
    Returns:
508
         (ProcessorEntity): The new Processor
509

510
    """
511
    assert isinstance(parent_pg, nipyapi.nifi.ProcessGroupEntity)
1✔
512
    assert isinstance(processor, nipyapi.nifi.DocumentedTypeDTO)
1✔
513
    if name is None:
1✔
514
        processor_name = processor.type.split('.')[-1]
×
515
    else:
516
        processor_name = name
1✔
517
    if config is None:
1✔
518
        target_config = nipyapi.nifi.ProcessorConfigDTO()
1✔
519
    else:
520
        target_config = config
1✔
521
    with nipyapi.utils.rest_exceptions():
1✔
522
        return nipyapi.nifi.ProcessGroupsApi().create_processor(
1✔
523
            id=parent_pg.id,
524
            body=nipyapi.nifi.ProcessorEntity(
525
                revision={'version': 0},
526
                component=nipyapi.nifi.ProcessorDTO(
527
                    position=nipyapi.nifi.PositionDTO(
528
                        x=float(location[0]),
529
                        y=float(location[1])
530
                    ),
531
                    type=processor.type,
532
                    name=processor_name,
533
                    config=target_config
534
                )
535
            )
536
        )
537

538

539
@exception_handler(404, None)
1✔
540
def get_processor(identifier, identifier_type='name', greedy=True):
1✔
541
    """
542
    Filters the list of all Processors against the given identifier string in
543
    the given identifier_type field
544

545
    Args:
546
        identifier (str): The String to filter against
547
        identifier_type (str): The field to apply the filter to. Set in
548
            config.py
549
        greedy (bool): Whether to exact match (False) or partial match (True)
550

551
    Returns:
552
        None for no matches, Single Object for unique match,
553
        list(Objects) for multiple matches
554
    """
555
    assert isinstance(identifier, six.string_types)
1✔
556
    assert identifier_type in ['name', 'id']
1✔
557
    if identifier_type == 'id':
1✔
558
        out = nipyapi.nifi.ProcessorsApi().get_processor(identifier)
1✔
559
    else:
560
        obj = list_all_processors()
1✔
561
        out = nipyapi.utils.filter_obj(
1✔
562
            obj, identifier, identifier_type, greedy=greedy
563
        )
564
    return out
1✔
565

566

567
def delete_processor(processor, refresh=True, force=False):
1✔
568
    """
569
    Deletes a Processor from the canvas, with optional prejudice.
570

571
    Args:
572
        processor (ProcessorEntity): The processor to delete
573
        refresh (bool): Whether to refresh the Processor state before action
574
        force (bool): Whether to stop, purge and remove connections to the
575
            Processor before deletion. Behavior may change in future releases.
576

577
    Returns:
578
         (ProcessorEntity): The updated ProcessorEntity
579

580
    """
581
    assert isinstance(processor, nipyapi.nifi.ProcessorEntity)
1✔
582
    assert isinstance(refresh, bool)
1✔
583
    assert isinstance(force, bool)
1✔
584
    if refresh or force:
1✔
585
        target = get_processor(processor.id, 'id')
1✔
586
        if target is None:
1✔
587
            return None  # Processor does not exist
1✔
588
        assert isinstance(target, nipyapi.nifi.ProcessorEntity)
1✔
589
    else:
590
        target = processor
×
591
    if force:
1✔
592
        if not schedule_processor(target, False):
1✔
593
            raise ValueError("Could not prepare processor {0} for deletion"
×
594
                             .format(target.id))
595
        inbound_cons = [
1✔
596
            x for x in get_component_connections(processor)
597
            if processor.id == x.destination_id
598
        ]
599
        for con in inbound_cons:
1✔
600
            delete_connection(con, purge=True)
1✔
601
        # refresh state before trying delete
602
        target = get_processor(processor.id, 'id')
1✔
603
        assert isinstance(target, nipyapi.nifi.ProcessorEntity)
1✔
604
    with nipyapi.utils.rest_exceptions():
1✔
605
        return nipyapi.nifi.ProcessorsApi().delete_processor(
1✔
606
            id=target.id,
607
            version=target.revision.version
608
        )
609

610

611
def schedule_components(pg_id, scheduled, components=None):
1✔
612
    """
613
    Changes the scheduled target state of a list of components within a given
614
    Process Group.
615

616
    Note that this does not guarantee that components will be Started or
617
    Stopped afterwards, merely that they will have their scheduling updated.
618

619
    Args:
620
        pg_id (str): The UUID of the parent Process Group
621
        scheduled (bool): True to start, False to stop
622
        components (list[ComponentType]): The list of Component Entities to
623
            schdule, e.g. ProcessorEntity's
624

625
    Returns:
626
         (bool): True for success, False for not
627

628
    """
629
    assert isinstance(
1✔
630
        get_process_group(pg_id, 'id'),
631
        nipyapi.nifi.ProcessGroupEntity
632
    )
633
    assert isinstance(scheduled, bool)
1✔
634
    assert components is None or isinstance(components, list)
1✔
635
    target_state = 'RUNNING' if scheduled else 'STOPPED'
1✔
636
    body = nipyapi.nifi.ScheduleComponentsEntity(
1✔
637
        id=pg_id,
638
        state=target_state
639
    )
640
    if components:
1✔
641
        body.components = {i.id: i.revision for i in components}
1✔
642
    with nipyapi.utils.rest_exceptions():
1✔
643
        result = nipyapi.nifi.FlowApi().schedule_components(
1✔
644
            id=pg_id,
645
            body=body
646
        )
647
    if result.state == target_state:
1✔
648
        return True
1✔
649
    return False
×
650

651

652
def schedule_processor(processor, scheduled, refresh=True):
1✔
653
    """
654
    Set a Processor to Start or Stop.
655

656
    Note that this doesn't guarantee that it will change state, merely that
657
    it will be instructed to try.
658
    Some effort is made to wait and see if the processor starts
659

660
    Args:
661
        processor (ProcessorEntity): The Processor to target
662
        scheduled (bool): True to start, False to stop
663
        refresh (bool): Whether to refresh the object before action
664

665
    Returns:
666
        (bool): True for success, False for failure
667

668
    """
669
    assert isinstance(processor, nipyapi.nifi.ProcessorEntity)
1✔
670
    assert isinstance(scheduled, bool)
1✔
671
    assert isinstance(refresh, bool)
1✔
672

673
    def _running_schedule_processor(processor_):
1✔
674
        test_obj = nipyapi.canvas.get_processor(processor_.id, 'id')
1✔
675
        if test_obj.status.aggregate_snapshot.active_thread_count == 0:
1✔
676
            return True
1✔
677
        log.info("Processor not stopped, active thread count %s",
×
678
                 test_obj.status.aggregate_snapshot.active_thread_count)
679
        return False
×
680

681
    def _starting_schedule_processor(processor_):
1✔
682
        test_obj = nipyapi.canvas.get_processor(processor_.id, 'id')
1✔
683
        if test_obj.component.state == 'RUNNING':
1✔
684
            return True
1✔
685
        log.info("Processor not started, run_status %s",
×
686
                 test_obj.component.state)
687
        return False
×
688

689
    assert isinstance(scheduled, bool)
1✔
690
    if refresh:
1✔
691
        target = nipyapi.canvas.get_processor(processor.id, 'id')
1✔
692
        assert isinstance(target, nipyapi.nifi.ProcessorEntity)
1✔
693
    else:
694
        target = processor
×
695
    result = schedule_components(
1✔
696
        pg_id=target.status.group_id,
697
        scheduled=scheduled,
698
        components=[target]
699
    )
700
    # If target scheduled state was successfully updated
701
    if result:
1✔
702
        # If we want to stop the processor
703
        if not scheduled:
1✔
704
            # Test that the processor threads have halted
705
            stop_test = nipyapi.utils.wait_to_complete(
1✔
706
                _running_schedule_processor, target
707
            )
708
            if stop_test:
1✔
709
                # Return True if we stopped the processor
710
                return result
1✔
711
            # Return False if we scheduled a stop, but it didn't stop
712
            return False
×
713
        # Test that the Processor started
714
        start_test = nipyapi.utils.wait_to_complete(
1✔
715
            _starting_schedule_processor, target
716
        )
717
        if start_test:
1✔
718
            return result
1✔
719
        return False
×
720

721

722
def update_process_group(pg, update, refresh=True):
1✔
723
    """
724
        Updates a given Process Group.
725

726
        Args:
727
            pg (ProcessGroupEntity): The Process Group to
728
              target for update
729
            update (dict): key:value pairs to update
730
            refresh (bool): Whether to refresh the Process Group before
731
              applying the update
732

733
        Returns:
734
            (ProcessGroupEntity): The updated ProcessorEntity
735

736
        """
737
    assert isinstance(pg, nipyapi.nifi.ProcessGroupEntity)
1✔
738
    with nipyapi.utils.rest_exceptions():
1✔
739
        if refresh:
1✔
740
            pg = get_process_group(pg.id, 'id')
1✔
741
        return nipyapi.nifi.ProcessGroupsApi().update_process_group(
1✔
742
            id=pg.id,
743
            body=nipyapi.nifi.ProcessGroupEntity(
744
                component=nipyapi.nifi.ProcessGroupDTO(
745
                    id=pg.id,
746
                    **update
747
                ),
748
                id=pg.id,
749
                revision=pg.revision
750
            )
751
        )
752

753

754
def update_processor(processor, update, refresh=True):
1✔
755
    """
756
    Updates configuration parameters for a given Processor.
757

758
    An example update would be:
759
    nifi.ProcessorConfigDTO(scheduling_period='3s')
760

761
    Args:
762
        processor (ProcessorEntity): The Processor to target for update
763
        update (ProcessorConfigDTO): The new configuration parameters
764
        refresh (bool): Whether to refresh the Processor object state
765
          before applying the update
766

767
    Returns:
768
        (ProcessorEntity): The updated ProcessorEntity
769

770
    """
771
    if not isinstance(update, nipyapi.nifi.ProcessorConfigDTO):
1✔
772
        raise ValueError(
1✔
773
            "update param is not an instance of nifi.ProcessorConfigDTO"
774
        )
775
    with nipyapi.utils.rest_exceptions():
1✔
776
        if refresh:
1✔
777
            processor = get_processor(processor.id, 'id')
1✔
778
        return nipyapi.nifi.ProcessorsApi().update_processor(
1✔
779
            id=processor.id,
780
            body=nipyapi.nifi.ProcessorEntity(
781
                component=nipyapi.nifi.ProcessorDTO(
782
                    config=update,
783
                    id=processor.id
784
                ),
785
                revision=processor.revision,
786
            )
787
        )
788

789

790
def get_variable_registry(process_group, ancestors=True):
1✔
791
    """
792
    Gets the contents of the variable registry attached to a Process Group
793

794
    Args:
795
        process_group (ProcessGroupEntity): The Process Group to retrieve the
796
            Variable Registry from
797
        ancestors (bool): Whether to include the Variable Registries from child
798
            Process Groups
799

800
    Returns:
801
        (VariableRegistryEntity): The Variable Registry
802

803
    """
804
    with nipyapi.utils.rest_exceptions():
1✔
805
        return nipyapi.nifi.ProcessGroupsApi().get_variable_registry(
1✔
806
            process_group.id,
807
            include_ancestor_groups=ancestors
808
        )
809

810

811
def update_variable_registry(process_group, update, refresh=True):
1✔
812
    """
813
    Updates one or more key:value pairs in the variable registry
814

815
    Args:
816
        process_group (ProcessGroupEntity): The Process Group which has the
817
        Variable Registry to be updated
818
        update (list[tuple]): The variables to write to the registry
819
        refresh (bool): Whether to refresh the object revision before updating
820

821
    Returns:
822
        (VariableRegistryEntity): The created or updated Variable Registry
823
        Entries
824

825
    """
826
    if not isinstance(process_group, nipyapi.nifi.ProcessGroupEntity):
1✔
827
        raise ValueError(
×
828
            'param process_group is not a valid nifi.ProcessGroupEntity'
829
        )
830
    if not isinstance(update, list):
1✔
831
        raise ValueError(
1✔
832
            'param update is not a valid list of (key,value) tuples'
833
        )
834
    # Parse variable update into the datatype
835
    var_update = [
1✔
836
        nipyapi.nifi.VariableEntity(
837
            nipyapi.nifi.VariableDTO(
838
                name=li[0],
839
                value=li[1],
840
                process_group_id=process_group.id
841
            )
842
        ) for li in update
843
    ]
844
    with nipyapi.utils.rest_exceptions():
1✔
845
        if refresh:
1✔
846
            process_group = get_process_group(process_group.id, 'id')
1✔
847
        return nipyapi.nifi.ProcessGroupsApi().update_variable_registry(
1✔
848
            id=process_group.id,
849
            body=nipyapi.nifi.VariableRegistryEntity(
850
                process_group_revision=process_group.revision,
851
                variable_registry=nipyapi.nifi.VariableRegistryDTO(
852
                    process_group_id=process_group.id,
853
                    variables=var_update
854
                )
855
            )
856
        )
857

858

859
def create_connection(source, target, relationships=None, name=None):
1✔
860
    """
861
    Creates a connection between two objects for the given relationships
862

863
    Args:
864
        source: Object to initiate the connection, e.g. ProcessorEntity
865
        target: Object to terminate the connection, e.g. FunnelEntity
866
        relationships (list): list of strings of relationships to connect, may
867
            be collected from the object 'relationships' property (optional)
868
        name (str): Defaults to None, String of Name for Connection (optional)
869

870
    Returns:
871
        (ConnectionEntity): for the created connection
872

873
    """
874
    # determine source and destination strings by class supplied
875
    source_type = nipyapi.utils.infer_object_label_from_class(source)
1✔
876
    target_type = nipyapi.utils.infer_object_label_from_class(target)
1✔
877
    if source_type not in ['OUTPUT_PORT', 'INPUT_PORT', 'FUNNEL']:
1✔
878
        source_rels = [x.name for x in source.component.relationships]
1✔
879
        if relationships:
1✔
880
            assert all(i in source_rels for i in relationships), \
1✔
881
                "One or more relationships [{0}] not in list of valid " \
882
                "Source Relationships [{1}]" \
883
                .format(str(relationships), str(source_rels))
884
        else:
885
            # if no relationships supplied, we connect them all
886
            relationships = source_rels
1✔
887
    if source_type == 'OUTPUT_PORT':
1✔
888
        # the hosting process group for an Output port connection to another
889
        # process group is the common parent process group
890
        parent_pg = get_process_group(source.component.parent_group_id, 'id')
1✔
891
        if parent_pg.id == get_root_pg_id():
1✔
892
            parent_id = parent_pg.id
×
893
        else:
894
            parent_id = parent_pg.component.parent_group_id
1✔
895
    else:
896
        parent_id = source.component.parent_group_id
1✔
897
    with nipyapi.utils.rest_exceptions():
1✔
898
        return nipyapi.nifi.ProcessGroupsApi().create_connection(
1✔
899
            id=parent_id,
900
            body=nipyapi.nifi.ConnectionEntity(
901
                revision=nipyapi.nifi.RevisionDTO(
902
                    version=0
903
                ),
904
                source_type=source_type,
905
                destination_type=target_type,
906
                component=nipyapi.nifi.ConnectionDTO(
907
                    source=nipyapi.nifi.ConnectableDTO(
908
                        id=source.id,
909
                        group_id=source.component.parent_group_id,
910
                        type=source_type
911
                    ),
912
                    name=name,
913
                    destination=nipyapi.nifi.ConnectableDTO(
914
                        id=target.id,
915
                        group_id=target.component.parent_group_id,
916
                        type=target_type
917
                    ),
918
                    selected_relationships=relationships
919
                )
920
            )
921
        )
922

923

924
def delete_connection(connection, purge=False):
1✔
925
    """
926
    Deletes a connection, optionally purges it first
927

928
    Args:
929
        connection (ConnectionEntity): Connection to delete
930
        purge (bool): True to Purge, Defaults to False
931

932
    Returns:
933
        (ConnectionEntity): the modified Connection
934

935
    """
936
    assert isinstance(connection, nipyapi.nifi.ConnectionEntity)
1✔
937
    if purge:
1✔
938
        purge_connection(connection.id)
1✔
939
    with nipyapi.utils.rest_exceptions():
1✔
940
        return nipyapi.nifi.ConnectionsApi().delete_connection(
1✔
941
            id=connection.id,
942
            version=connection.revision.version
943
        )
944

945

946
def list_all_connections(pg_id='root', descendants=True):
1✔
947
    """
948
    Lists all connections for a given Process Group ID
949

950
    Args:
951
        pg_id (str): ID of the Process Group to retrieve Connections from
952
        descendants (bool): True to recurse child PGs, False to not
953

954
    Returns:
955
        (list): List of ConnectionEntity objects
956

957
    """
958
    return list_all_by_kind('connections', pg_id, descendants)
1✔
959

960

961
def get_component_connections(component):
1✔
962
    """
963
    Returns list of Connections related to a given Component, e.g. Processor
964

965
    Args:
966
        component: Component Object to filter by, e.g. a ProcessorEntity
967

968
    Returns:
969
        (list): List of ConnectionEntity Objects
970
    """
971
    assert isinstance(component, nipyapi.nifi.ProcessorEntity)
1✔
972
    return [
1✔
973
        x for x
974
        in list_all_connections(pg_id=component.component.parent_group_id)
975
        if component.id in [x.destination_id, x.source_id]
976
    ]
977

978

979
def purge_connection(con_id):
1✔
980
    """
981
    EXPERIMENTAL
982
    Drops all FlowFiles in a given connection. Waits until the action is
983
    complete before returning.
984

985
    Note that if upstream component isn't stopped, more data may flow into
986
    the connection after this action.
987

988
    Args:
989
        con_id (str): The UUID of the Connection to be purged
990

991
    Returns:
992
        (DropRequestEntity): The status reporting object for the drop
993
        request.
994

995
    """
996

997
    def _autumn_leaves(con_id_, drop_request_):
1✔
998
        test_obj = nipyapi.nifi.FlowfileQueuesApi().get_drop_request(
1✔
999
            con_id_,
1000
            drop_request_.drop_request.id
1001
        ).drop_request
1002
        if not test_obj.finished:
1✔
1003
            return False
×
1004
        if test_obj.failure_reason:
1✔
1005
            raise ValueError(
×
1006
                "Unable to complete drop request{0}, error was {1}"
1007
                .format(test_obj, test_obj.drop_request.failure_reason)
1008
            )
1009
        return True
1✔
1010

1011
    with nipyapi.utils.rest_exceptions():
1✔
1012
        drop_req = nipyapi.nifi.FlowfileQueuesApi().create_drop_request(con_id)
1✔
1013
    assert isinstance(drop_req, nipyapi.nifi.DropRequestEntity)
1✔
1014
    return nipyapi.utils.wait_to_complete(_autumn_leaves, con_id, drop_req)
1✔
1015

1016

1017
def purge_process_group(process_group, stop=False):
1✔
1018
    """
1019
    EXPERIMENTAL
1020
    Purges the connections in a given Process Group of FlowFiles, and
1021
    optionally stops it first
1022

1023
    Args:
1024
        process_group (ProcessGroupEntity): Target Process Group
1025
        stop (Optional [bool]): Whether to stop the Process Group before action
1026

1027
    Returns:
1028
        (list[dict{ID:True|False}]): Result set. A list of Dicts of
1029
    Connection IDs mapped to True or False for success of each connection
1030

1031
    """
1032
    assert isinstance(process_group, nipyapi.nifi.ProcessGroupEntity)
1✔
1033
    assert isinstance(stop, bool)
1✔
1034
    if stop:
1✔
1035
        if not schedule_process_group(process_group.id, False):
1✔
1036
            raise ValueError(
×
1037
                "Unable to stop Process Group {0} for purging"
1038
                .format(process_group.id)
1039
            )
1040
    cons = list_all_connections(process_group.id)
1✔
1041
    result = []
1✔
1042
    for con in cons:
1✔
1043
        result.append({con.id: str(purge_connection(con.id))})
×
1044
    return result
1✔
1045

1046

1047
def get_bulletins():
1✔
1048
    """
1049
    Retrieves current bulletins (alerts) from the Flow Canvas
1050

1051
    Returns:
1052
        (ControllerBulletinsEntity): The native datatype containing a list
1053
    of bulletins
1054
    """
1055
    with nipyapi.utils.rest_exceptions():
1✔
1056
        return nipyapi.nifi.FlowApi().get_bulletins()
1✔
1057

1058

1059
def get_bulletin_board():
1✔
1060
    """
1061
    Retrieves the bulletin board object
1062

1063
    Returns:
1064
        (BulletinBoardEntity): The native datatype BulletinBoard object
1065
    """
1066
    with nipyapi.utils.rest_exceptions():
1✔
1067
        return nipyapi.nifi.FlowApi().get_bulletin_board()
1✔
1068

1069

1070
def create_controller(parent_pg, controller, name=None):
1✔
1071
    """
1072
    Creates a new Controller Service in a given Process Group of the given
1073
        Controller type, with the given Name
1074

1075
    Args:
1076
        parent_pg (ProcessGroupEntity): Target Parent PG
1077
        controller (DocumentedTypeDTO): Type of Controller to create, found
1078
            via the list_all_controller_types method
1079
        name (str[Optional]): Name for the new Controller as a String
1080

1081
    Returns:
1082
        (ControllerServiceEntity)
1083

1084
    """
1085
    assert isinstance(controller, nipyapi.nifi.DocumentedTypeDTO)
1✔
1086
    assert isinstance(parent_pg, nipyapi.nifi.ProcessGroupEntity)
1✔
1087
    assert name is None or isinstance(name, six.string_types)
1✔
1088
    with nipyapi.utils.rest_exceptions():
1✔
1089
        out = nipyapi.nifi.ProcessGroupsApi().create_controller_service(
1✔
1090
            id=parent_pg.id,
1091
            body=nipyapi.nifi.ControllerServiceEntity(
1092
                revision={'version': 0},
1093
                component=nipyapi.nifi.ControllerServiceDTO(
1094
                    bundle=controller.bundle,
1095
                    type=controller.type
1096
                )
1097
            ),
1098
        )
1099
        if name:
1✔
1100
            update_controller(
×
1101
                out,
1102
                nipyapi.nifi.ControllerServiceDTO(
1103
                    name=name
1104
                )
1105
            )
1106
    return out
1✔
1107

1108

1109
def list_all_controllers(pg_id='root', descendants=True, include_reporting_tasks=False):
1✔
1110
    """
1111
    Lists all controllers under a given Process Group, defaults to Root
1112
        Optionally recurses all child Process Groups as well
1113
    Args:
1114
        pg_id (str): String of the ID of the Process Group to list from
1115
        descendants (bool): True to recurse child PGs, False to not
1116
        include_reporting_tasks (bool): True to include Reporting Tasks, False to not
1117

1118
    Returns:
1119
        None, ControllerServiceEntity, or list(ControllerServiceEntity)
1120

1121
    """
1122
    assert isinstance(pg_id, six.string_types)
1✔
1123
    assert isinstance(descendants, bool)
1✔
1124
    handle = nipyapi.nifi.FlowApi()
1✔
1125
    # Testing shows that descendant doesn't work on NiFi-1.1.2
1126
    # Or 1.2.0, despite the descendants option being available
1127
    if nipyapi.utils.check_version('1.2.0') >= 0:
1✔
1128
        # Case where NiFi <= 1.2.0
1129
        out = []
×
1130
        if descendants:
×
1131
            pgs = list_all_process_groups(pg_id)
×
1132
        else:
1133
            pgs = [get_process_group(pg_id, 'id')]
×
1134
        for pg in pgs:
×
1135
            new_conts = handle.get_controller_services_from_group(
×
1136
                pg.id).controller_services
1137
            # trim duplicates from inheritance
1138
            out += [
×
1139
                x for x in new_conts
1140
                if x.id not in [y.id for y in out]
1141
            ]
1142
    else:
1143
        # Case where NiFi > 1.2.0
1144
        # duplicate trim already handled by server
1145
        out = handle.get_controller_services_from_group(
1✔
1146
            pg_id,
1147
            include_descendant_groups=descendants
1148
        ).controller_services
1149
    if include_reporting_tasks:
1✔
1150
        mgmt_handle = nipyapi.nifi.FlowApi()
1✔
1151
        out += mgmt_handle.get_controller_services_from_controller().controller_services
1✔
1152
    return out
1✔
1153

1154

1155
def delete_controller(controller, force=False):
1✔
1156
    """
1157
    Delete a Controller service, with optional prejudice
1158

1159
    Args:
1160
        controller (ControllerServiceEntity): Target Controller to delete
1161
        force (bool): True to attempt Disable the Controller before deletion
1162

1163
    Returns:
1164
        (ControllerServiceEntity)
1165

1166
    """
1167
    assert isinstance(controller, nipyapi.nifi.ControllerServiceEntity)
1✔
1168
    assert isinstance(force, bool)
1✔
1169

1170
    def _del_cont(cont_id):
1✔
1171
        if not get_controller(cont_id, 'id', bool_response=True):
1✔
1172
            return True
1✔
1173
        return False
×
1174

1175
    handle = nipyapi.nifi.ControllerServicesApi()
1✔
1176
    if force:
1✔
1177
        # Stop and refresh
1178
        controller = schedule_controller(controller, False, True)
1✔
1179
    with nipyapi.utils.rest_exceptions():
1✔
1180
        result = handle.remove_controller_service(
1✔
1181
            id=controller.id,
1182
            version=controller.revision.version
1183
        )
1184
    del_test = nipyapi.utils.wait_to_complete(
1✔
1185
        _del_cont,
1186
        controller.id,
1187
        nipyapi_max_wait=15,
1188
        nipyapi_delay=1
1189
    )
1190
    if not del_test:
1✔
1191
        raise ValueError("Timed out waiting for Controller Deletion")
×
1192
    return result
1✔
1193

1194

1195
def update_controller(controller, update):
1✔
1196
    """
1197
    Updates the Configuration of a Controller Service
1198

1199
    Args:
1200
        controller (ControllerServiceEntity): Target Controller to update
1201
        update (ControllerServiceDTO): Controller Service configuration object
1202
            containing the new config params and properties
1203

1204
    Returns:
1205
        (ControllerServiceEntity)
1206

1207
    """
1208
    assert isinstance(controller, nipyapi.nifi.ControllerServiceEntity)
1✔
1209
    assert isinstance(update, nipyapi.nifi.ControllerServiceDTO)
1✔
1210
    # Insert the ID into the update
1211
    update.id = controller.id
1✔
1212
    return nipyapi.nifi.ControllerServicesApi().update_controller_service(
1✔
1213
        id=controller.id,
1214
        body=nipyapi.nifi.ControllerServiceEntity(
1215
            component=update,
1216
            revision=controller.revision,
1217
            id=controller.id
1218
        )
1219
    )
1220

1221

1222
def schedule_controller(controller, scheduled, refresh=False):
1✔
1223
    """
1224
    Start/Enable or Stop/Disable a Controller Service
1225

1226
    Args:
1227
        controller (ControllerServiceEntity): Target Controller to schedule
1228
        scheduled (bool): True to start, False to stop
1229
        refresh (bool): Whether to refresh the component revision before
1230
          execution
1231

1232
    Returns:
1233
        (ControllerServiceEntity)
1234

1235
    """
1236
    assert isinstance(controller, nipyapi.nifi.ControllerServiceEntity)
1✔
1237
    assert isinstance(scheduled, bool)
1✔
1238

1239
    def _schedule_controller_state(cont_id, tgt_state):
1✔
1240
        test_obj = get_controller(cont_id, 'id')
1✔
1241
        if test_obj.component.state == tgt_state:
1✔
1242
            return True
1✔
1243
        return False
×
1244

1245
    handle = nipyapi.nifi.ControllerServicesApi()
1✔
1246
    target_state = 'ENABLED' if scheduled else 'DISABLED'
1✔
1247
    if refresh:
1✔
1248
        controller = nipyapi.canvas.get_controller(controller.id, 'id')
1✔
1249
        assert isinstance(controller, nipyapi.nifi.ControllerServiceEntity)
1✔
1250
    if nipyapi.utils.check_version('1.2.0') >= 0:
1✔
1251
        # Case where NiFi <= 1.2.0
1252
        result = update_controller(
×
1253
            controller=controller,
1254
            update=nipyapi.nifi.ControllerServiceDTO(
1255
                state=target_state
1256
            )
1257
        )
1258
    else:
1259
        # Case where NiFi > 1.2.0
1260
        result = handle.update_run_status(
1✔
1261
            id=controller.id,
1262
            body=nipyapi.nifi.ControllerServiceRunStatusEntity(
1263
                revision=controller.revision,
1264
                state=target_state
1265
            )
1266
        )
1267
    if not result:
1✔
1268
        raise ValueError("Scheduling request failed")
×
1269
    state_test = nipyapi.utils.wait_to_complete(
1✔
1270
        _schedule_controller_state,
1271
        controller.id,
1272
        target_state,
1273
        nipyapi_delay=nipyapi.config.long_retry_delay,
1274
        nipyapi_max_wait=nipyapi.config.long_max_wait
1275
    )
1276
    if state_test:
1✔
1277
        return get_controller(controller.id, 'id')
1✔
1278
    raise ValueError("Scheduling request timed out")
×
1279

1280

1281
def get_controller(identifier, identifier_type='name', bool_response=False,
1✔
1282
                   include_reporting_tasks=True):
1283
    """
1284
    Retrieve a given Controller
1285

1286
    Args:
1287
        identifier (str): ID or Name of a Controller to find
1288
        identifier_type (str): 'id' or 'name', defaults to name
1289
        bool_response (bool): If True, will return False if the Controller is
1290
            not found - useful when testing for deletion completion
1291
        include_reporting_tasks (bool): If True, will include Reporting Tasks in the search
1292

1293
    Returns:
1294

1295
    """
1296
    assert isinstance(identifier, six.string_types)
1✔
1297
    assert identifier_type in ['name', 'id']
1✔
1298
    handle = nipyapi.nifi.ControllerServicesApi()
1✔
1299
    out = None
1✔
1300
    try:
1✔
1301
        if identifier_type == 'id':
1✔
1302
            out = handle.get_controller_service(identifier)
1✔
1303
        else:
1304
            obj = list_all_controllers(include_reporting_tasks=include_reporting_tasks)
1✔
1305
            out = nipyapi.utils.filter_obj(obj, identifier, identifier_type)
1✔
1306
    except nipyapi.nifi.rest.ApiException as e:
1✔
1307
        if bool_response:
1✔
1308
            return False
1✔
1309
        _raise(ValueError(e.body), e)
×
1310
    return out
1✔
1311

1312

1313
def list_all_controller_types():
1✔
1314
    """
1315
    Lists all Controller Service types available on the environment
1316

1317
    Returns:
1318
        list(DocumentedTypeDTO)
1319
    """
1320
    handle = nipyapi.nifi.FlowApi()
1✔
1321
    return handle.get_controller_service_types().controller_service_types
1✔
1322

1323

1324
def get_controller_type(identifier, identifier_type='name', greedy=True):
1✔
1325
    """
1326
    Gets the abstract object describing a controller, or list thereof
1327

1328
    Args:
1329
        identifier (str): the string to filter the list for
1330
        identifier_type (str): the field to filter on, set in config.py
1331
        greedy (bool): False for exact match, True for greedy match
1332

1333
    Returns:
1334
        None for no matches, Single Object for unique match,
1335
        list(Objects) for multiple matches
1336

1337
    """
1338
    with nipyapi.utils.rest_exceptions():
×
1339
        obj = list_all_controller_types()
×
1340
    if obj:
×
1341
        return nipyapi.utils.filter_obj(
×
1342
            obj, identifier, identifier_type, greedy=greedy
1343
        )
1344
    return obj
×
1345

1346

1347
def list_all_by_kind(kind, pg_id='root', descendants=True):
1✔
1348
    """
1349
    Retrieves a list of all instances of a supported object type
1350

1351
    Args:
1352
        kind (str):  one of input_ports, output_ports, funnels, controllers,
1353
            connections, remote_process_groups
1354
        pg_id (str): optional, ID of the Process Group to use as search base
1355
        descendants (bool): optional, whether to collect child group info
1356

1357
    Returns:
1358
        list of the Entity type of the kind, or single instance, or None
1359

1360
    """
1361
    assert kind in [
1✔
1362
        'input_ports', 'output_ports', 'funnels', 'controllers', 'connections',
1363
        'remote_process_groups'
1364
    ]
1365
    if kind == 'controllers':
1✔
1366
        return list_all_controllers(pg_id, descendants)
×
1367
    handle = nipyapi.nifi.ProcessGroupsApi()
1✔
1368
    call_function = getattr(handle, 'get_' + kind)
1✔
1369
    out = []
1✔
1370
    if descendants:
1✔
1371
        pgs = list_all_process_groups(pg_id)
1✔
1372
    else:
1373
        pgs = [get_process_group(pg_id, 'id')]
×
1374
    for pg in pgs:
1✔
1375
        out += getattr(call_function(pg.id), kind)
1✔
1376
    return out
1✔
1377

1378

1379
def list_all_input_ports(pg_id='root', descendants=True):
1✔
1380
    """Convenience wrapper for list_all_by_kind for input ports"""
1381
    return list_all_by_kind('input_ports', pg_id, descendants)
×
1382

1383

1384
def list_all_output_ports(pg_id='root', descendants=True):
1✔
1385
    """Convenience wrapper for list_all_by_kind for output ports"""
1386
    return list_all_by_kind('output_ports', pg_id, descendants)
×
1387

1388

1389
def list_all_funnels(pg_id='root', descendants=True):
1✔
1390
    """Convenience wrapper for list_all_by_kind for funnels"""
1391
    return list_all_by_kind('funnels', pg_id, descendants)
1✔
1392

1393

1394
def list_all_remote_process_groups(pg_id='root', descendants=True):
1✔
1395
    """Convenience wrapper for list_all_by_kind for remote process groups"""
1396
    return list_all_by_kind('remote_process_groups', pg_id, descendants)
1✔
1397

1398

1399
def get_remote_process_group(rpg_id, summary=False):
1✔
1400
    """
1401
    Fetch a remote process group object, with optional summary of just ports
1402
    """
1403
    rpg = nipyapi.nifi.RemoteProcessGroupsApi().get_remote_process_group(
1✔
1404
        rpg_id
1405
    )
1406
    if not summary:
1✔
1407
        out = rpg
1✔
1408
    else:
1409
        out = {
×
1410
            'id': rpg.id,
1411
            'input_ports': rpg.component.contents.input_ports,
1412
            'output_ports': rpg.component.contents.output_ports
1413
        }
1414
    return out
1✔
1415

1416

1417
def create_remote_process_group(target_uris, transport='RAW', pg_id='root',
1✔
1418
                                position=None):
1419
    """
1420
    Creates a new Remote Process Group with given parameters
1421

1422
    Args:
1423
        target_uris (str): Comma separated list of target URIs
1424
        transport (str): optional, RAW or HTTP
1425
        pg_id (str): optional, UUID of parent Process Group for remote
1426
          process group
1427
        position (tuple): optional, tuple of location ints
1428

1429
    Returns:
1430
        (RemoteProcessGroupEntity)
1431
    """
1432
    assert isinstance(target_uris, str)
1✔
1433
    assert transport in ['RAW', 'HTTP']
1✔
1434
    assert isinstance(pg_id, str)
1✔
1435
    pg_id = pg_id if not 'root' else get_root_pg_id()
1✔
1436
    position = position if position else (400, 400)
1✔
1437
    assert isinstance(position, tuple)
1✔
1438
    with nipyapi.utils.rest_exceptions():
1✔
1439
        return nipyapi.nifi.ProcessGroupsApi().create_remote_process_group(
1✔
1440
            id=pg_id,
1441
            body=nipyapi.nifi.RemoteProcessGroupEntity(
1442
                component=nipyapi.nifi.RemoteProcessGroupDTO(
1443
                    position=nipyapi.nifi.PositionDTO(
1444
                        x=float(position[0]),
1445
                        y=float(position[1])
1446
                    ),
1447
                    target_uris=target_uris,
1448
                    transport_protocol=transport
1449
                ),
1450
                revision=nipyapi.nifi.RevisionDTO(version=0),
1451
            )
1452
        )
1453

1454

1455
def delete_remote_process_group(rpg, refresh=True):
1✔
1456
    """
1457
    Deletes a given remote process group
1458

1459
    Args:
1460
        rpg (RemoteProcessGroupEntity): Remote Process Group to remove
1461
        refresh (bool): Whether to refresh the object before action
1462

1463
    Returns:
1464
        (RemoteProcessGroupEntity)
1465
    """
1466
    assert isinstance(rpg, nipyapi.nifi.RemoteProcessGroupEntity)
1✔
1467
    if refresh:
1✔
1468
        rpg = get_remote_process_group(rpg.id)
1✔
1469
    handle = nipyapi.nifi.RemoteProcessGroupsApi()
1✔
1470
    with nipyapi.utils.rest_exceptions():
1✔
1471
        return handle.remove_remote_process_group(
1✔
1472
            id=rpg.id,
1473
            version=rpg.revision.version
1474
        )
1475

1476

1477
def set_remote_process_group_transmission(rpg, enable=True, refresh=True):
1✔
1478
    """
1479
    Enable or Disable Transmission for an RPG
1480

1481
    Args:
1482
        rpg (RemoteProcessGroupEntity): The ID of the remote process group
1483
          to modify
1484
        enable (bool): True to enable, False to disable
1485
        refresh (bool): Whether to refresh the object before action
1486

1487
    Returns:
1488

1489
    """
1490
    assert isinstance(rpg, nipyapi.nifi.RemoteProcessGroupEntity)
1✔
1491
    assert isinstance(enable, bool)
1✔
1492
    if refresh:
1✔
1493
        rpg = get_remote_process_group(rpg.id)
1✔
1494
    handle = nipyapi.nifi.RemoteProcessGroupsApi()
1✔
1495
    with nipyapi.utils.rest_exceptions():
1✔
1496
        return handle.update_remote_process_group_run_status(
1✔
1497
            id=rpg.id,
1498
            body=nipyapi.nifi.RemotePortRunStatusEntity(
1499
                state='TRANSMITTING' if enable else 'STOPPED',
1500
                revision=rpg.revision
1501
            )
1502
        )
1503

1504

1505
def create_port(pg_id, port_type, name, state, position=None):
1✔
1506
    """
1507
    Creates a new input or output port of given characteristics
1508

1509
    Args:
1510
        pg_id (str): ID of the parent Process Group
1511
        port_type (str): Either of INPUT_PORT or OUTPUT_PORT
1512
        name (str): optional, Name to assign to the port
1513
        state (str): One of RUNNING, STOPPED, DISABLED
1514
        position (tuple): optional, tuple of ints like (400, 400)
1515

1516
    Returns:
1517
        (PortEntity) of the created port
1518

1519
    """
1520
    assert state in ["RUNNING", "STOPPED", "DISABLED"]
1✔
1521
    assert port_type in ["INPUT_PORT", "OUTPUT_PORT"]
1✔
1522
    assert isinstance(pg_id, six.string_types)
1✔
1523
    position = position if position else (400, 400)
1✔
1524
    assert isinstance(position, tuple)
1✔
1525
    handle = nipyapi.nifi.ProcessGroupsApi()
1✔
1526
    port_generator = getattr(handle, 'create_' + port_type.lower())
1✔
1527
    with nipyapi.utils.rest_exceptions():
1✔
1528
        return port_generator(
1✔
1529
            id=pg_id,
1530
            body=nipyapi.nifi.PortEntity(
1531
                revision=nipyapi.nifi.RevisionDTO(version=0),
1532
                component=nipyapi.nifi.PortDTO(
1533
                    parent_group_id=pg_id,
1534
                    position=nipyapi.nifi.PositionDTO(
1535
                        x=float(position[0]),
1536
                        y=float(position[1])
1537
                    ),
1538
                    name=name
1539
                )
1540
            )
1541
        )
1542

1543

1544
def delete_port(port):
1✔
1545
    """Deletes a given port from the canvas if possible"""
1546
    assert isinstance(port, nipyapi.nifi.PortEntity)
1✔
1547
    if 'INPUT' in port.port_type:
1✔
1548
        with nipyapi.utils.rest_exceptions():
1✔
1549
            return nipyapi.nifi.InputPortsApi().remove_input_port(
1✔
1550
                id=port.id,
1551
                version=port.revision.version)
1552
    if 'OUTPUT' in port.port_type:
1✔
1553
        with nipyapi.utils.rest_exceptions():
1✔
1554
            return nipyapi.nifi.OutputPortsApi().remove_output_port(
1✔
1555
                id=port.id,
1556
                version=port.revision.version)
1557

1558

1559
def get_funnel(funnel_id):
1✔
1560
    """Gets a given Funnel by ID"""
1561
    with nipyapi.utils.rest_exceptions():
1✔
1562
        return nipyapi.nifi.FunnelApi().get_funnel(funnel_id)
1✔
1563

1564

1565
def create_funnel(pg_id, position=None):
1✔
1566
    """
1567
    Creates a Funnel Object
1568

1569
    Args:
1570
        pg_id (str): ID of the parent Process Group
1571
        position (tuple[int, int]): Position on canvas
1572

1573
    Returns:
1574
        (FunnelEntity) Created Funnel
1575
    """
1576
    position = position if position else (400, 400)
1✔
1577
    assert isinstance(position, tuple)
1✔
1578
    with nipyapi.utils.rest_exceptions():
1✔
1579
        return nipyapi.nifi.ProcessGroupsApi().create_funnel(
1✔
1580
            id=pg_id,
1581
            body=nipyapi.nifi.FunnelEntity(
1582
                revision=nipyapi.nifi.RevisionDTO(version=0),
1583
                component=nipyapi.nifi.FunnelDTO(
1584
                    parent_group_id=pg_id,
1585
                    position=nipyapi.nifi.PositionDTO(
1586
                        x=float(position[0]),
1587
                        y=float(position[1])
1588
                    ),
1589
                )
1590
            )
1591
        )
1592

1593

1594
def delete_funnel(funnel, refresh=True):
1✔
1595
    """
1596
    Deletes a Funnel Object
1597

1598
    Args:
1599
        funnel (FunnelEntity): The Funnel to delete
1600
        refresh (bool): Whether to refresh the object state
1601
            before execution
1602

1603
    Returns:
1604
        (FunnelEntity) Deleted FunnelEntity reference
1605
    """
1606
    assert isinstance(funnel, nipyapi.nifi.FunnelEntity)
1✔
1607
    with nipyapi.utils.rest_exceptions():
1✔
1608
        if refresh:
1✔
1609
            funnel = get_funnel(funnel.id)
1✔
1610
        return nipyapi.nifi.FunnelApi().remove_funnel(
1✔
1611
            id=funnel.id,
1612
            version=funnel.revision.version
1613
        )
1614

1615

1616
def get_pg_parents_ids(pg_id):
1✔
1617
    """
1618
    Retrieve the ids of the parent Process Groups.
1619

1620
    Args:
1621
        pg_id (str): Process group id
1622

1623
    Returns:
1624
        (list) List of ids of the input PG parents
1625
    """
1626
    parent_groups = []
1✔
1627
    while pg_id:
1✔
1628
        pg_id = nipyapi.canvas.get_process_group(pg_id, 'id') \
1✔
1629
            .component.parent_group_id
1630
        parent_groups.append(pg_id)
1✔
1631
    # Removing the None value
1632
    parent_groups.pop()
1✔
1633
    return parent_groups
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc