• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

metauto-ai / GPTSwarm / 8409699437

24 Mar 2024 01:32PM UTC coverage: 59.285% (+0.7%) from 58.578%
8409699437

Pull #21

github

web-flow
Merge 2bd615d11 into bc70cecbc
Pull Request #21: Add specialist agent that aswers as if it were a <role>

93 of 104 new or added lines in 5 files covered. (89.42%)

19 existing lines in 3 files now uncovered.

2688 of 4534 relevant lines covered (59.29%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.93
/swarm/graph/node.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
import shortuuid
1✔
5
import asyncio
1✔
6
from typing import List, Any, Optional
1✔
7
from abc import ABC, abstractmethod
1✔
8
import warnings
1✔
9

10
from swarm.memory import GlobalMemory
1✔
11
from swarm.utils.log import logger
1✔
12
import pdb
1✔
13

14

15
class Node(ABC):
1✔
16
    """
17
    Represents a processing unit within a graph-based framework.
18

19
    This class encapsulates the functionality for a node in a graph, managing
20
    connections to other nodes, handling inputs and outputs, and executing
21
    assigned operations asynchronously. It supports both individual and
22
    aggregated processing modes.
23

24
    Attributes:
25
        id (uuid.UUID): Unique identifier for the node.
26
        agent: Associated agent for node-specific operations.
27
        operation_description (str): Brief description of the node's operation.
28
        predecessors (List[Node]): Nodes that precede this node in the graph.
29
        successors (List[Node]): Nodes that succeed this node in the graph.
30
        inputs (List[Any]): Inputs to be processed by the node.
31
        outputs (List[Any]): Results produced after node execution.
32
        is_aggregate (bool): Indicates if node aggregates inputs before processing.
33

34
    Methods:
35
        add_predecessor(operation): 
36
            Adds a node as a predecessor of this node, establishing a directed connection.
37
        add_successor(operation): 
38
            Adds a node as a successor of this node, establishing a directed connection.
39
        execute(**kwargs): 
40
            Asynchronously processes the inputs through the node's operation, handling each input individually.
41
        _execute(input, **kwargs): 
42
            An internal method that defines how a single input is processed by the node. This method should be implemented specifically for each node type.
43
    """
44

45
    def __init__(self, #agent: Type, 
1✔
46
                 operation_description: str, 
1✔
47
                 id: Optional[str], combine_inputs_as_one: bool,
1✔
48
                 ):
×
49
        """
×
50
        Initializes a new Node instance.
×
51
        """
×
52
        self.id = id if id is not None else shortuuid.ShortUUID().random(length=4)
1✔
53
        self.memory = GlobalMemory.instance()
1✔
54
        self.operation_description = operation_description
1✔
55
        self.predecessors: List[Node] = []
1✔
56
        self.successors: List[Node] = []
1✔
57
        self.inputs: List[Any] = []
1✔
58
        self.outputs: List[Any] = []
1✔
59
        self.combine_inputs_as_one = combine_inputs_as_one
1✔
60

61
    @property
1✔
62
    def node_name(self):
1✔
63
        return self.__class__.__name__
×
64
    
65
    def add_predecessor(self, operation: 'Node'):
1✔
66

67
        if operation not in self.predecessors:
1✔
68
            self.predecessors.append(operation)
1✔
69
            operation.successors.append(self)
1✔
70

71
    def add_successor(self, operation: 'Node'):
1✔
72

73
        if operation not in self.successors:
1✔
74
            self.successors.append(operation)
1✔
75
            operation.predecessors.append(self)
1✔
76

77
    def remove_predecessor(self, operation: 'Node'):
1✔
78
        if operation in self.predecessors:
×
79
            self.predecessors.remove(operation)
×
80
            operation.successors.remove(self)
×
81

82
    def remove_successor(self, operation: 'Node'):
1✔
83
        if operation in self.successors:
×
84
            self.successors.remove(operation)
×
85
            operation.predecessors.remove(self)
×
86

87
    def process_input(self, inputs):
1✔
88

89
        all_inputs = []
1✔
90
        if inputs is None:
1✔
91
            if self.predecessors:
×
92

93
                for predecessor in self.predecessors:
×
94
                    predecessor_input = self.memory.query_by_id(predecessor.id)
×
95

96
                    if isinstance(predecessor_input, list) and predecessor_input:
×
97
                        predecessor_input = predecessor_input[-1]
×
98
                        all_inputs.append(predecessor_input)
×
99
                inputs = all_inputs
×
100
            else:
×
101
                raise ValueError("Input must be provided either directly or from predecessors.")
×
102
            
103
        elif not isinstance(inputs, list):
1✔
104

105
            inputs = [inputs]
1✔
106

107
        return inputs
1✔
108

109
    async def execute(self, **kwargs):
1✔
110

111
        self.outputs = []
1✔
112
        tasks = []
1✔
113
        if not self.inputs and self.predecessors:
1✔
114
            if self.combine_inputs_as_one:
1✔
115
                combined_inputs = []
1✔
116
                for predecessor in self.predecessors:
1✔
117
                    predecessor_outputs = predecessor.outputs
1✔
118
                    if predecessor_outputs is not None and isinstance(predecessor_outputs, list):
1✔
119
                        combined_inputs.extend(predecessor_outputs)
1✔
120
                tasks.append(asyncio.create_task(self._execute(combined_inputs, **kwargs)))
1✔
UNCOV
121
            else:
×
UNCOV
122
                for predecessor in self.predecessors:
×
123
                    predecessor_outputs = predecessor.outputs
×
124
                    if isinstance(predecessor_outputs, list) and predecessor_outputs:
×
125
                        for predecessor_output in predecessor_outputs:
×
126
                            tasks.append(asyncio.create_task(self._execute(predecessor_output, **kwargs)))
×
127
        elif self.inputs:
1✔
128
            tasks = [asyncio.create_task(self._execute(input, **kwargs)) for input in self.inputs]
1✔
129
        else:
×
130
            warnings.warn("No input received.")
1✔
131
            return
1✔
132

133
        if tasks:
1✔
134
            results = await asyncio.gather(*tasks, return_exceptions=True)
1✔
135
            for result in results:
1✔
136
                if not isinstance(result, Exception):
1✔
137
                    if not isinstance(result, list):
1✔
138
                        result = [result]
1✔
139
                    self.outputs.extend(result)
1✔
UNCOV
140
                else:
×
UNCOV
141
                    logger.error(f"Node {type(self).__name__} failed to execute due to: {result.__class__.__name__}: {result}")
×
142

143
    @abstractmethod
1✔
144
    async def _execute(self, input, **kwargs):
1✔
UNCOV
145
        """ To be overriden by the descendant class """
×
146

147
    def log(self):
1✔
148

149
        items_for_id = self.memory.query_by_id(self.id)
1✔
150

151
        if items_for_id:
1✔
152
            last_item = items_for_id[-1]
1✔
UNCOV
153
        else:
×
UNCOV
154
            last_item = {}
×
155

156
        ignore_keys = ['task', 'input', 'format'] 
1✔
157
        formatted_items = '\n    '.join(
1✔
158
            f"\033[1;34m{key}\033[0m: {value}" for key, value in last_item.items() if key not in ignore_keys)
1✔
159
        formatted_output = f"Memory Records for ID \033[1;35m{self.id}\033[0m:\n    {formatted_items}"
1✔
160
        logger.info(formatted_output)
1✔
161

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc