• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vertti / lazy-ecs / 20895604377

11 Jan 2026 01:04PM UTC coverage: 90.038% (+0.06%) from 89.981%
20895604377

Pull #106

github

web-flow
Merge 69825d42f into 98985b196
Pull Request #106: feat: add stop task functionality

18 of 19 new or added lines in 3 files covered. (94.74%)

21 existing lines in 3 files now uncovered.

1437 of 1596 relevant lines covered (90.04%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.43
/src/lazy_ecs/features/task/task.py
1
"""Task operations for ECS."""
2

3
from __future__ import annotations
1✔
4

5
from typing import TYPE_CHECKING
1✔
6

7
from ...core.base import BaseAWSService
1✔
8
from ...core.types import TaskDetails, TaskHistoryDetails, TaskInfo
1✔
9
from ...core.utils import batch_items, paginate_aws_list
1✔
10

11
if TYPE_CHECKING:
12
    from mypy_boto3_ecs.client import ECSClient
13
    from mypy_boto3_ecs.type_defs import TaskDefinitionTypeDef, TaskTypeDef
14

15

16
class TaskService(BaseAWSService):
1✔
17
    """Service for ECS task operations."""
18

19
    def __init__(self, ecs_client: ECSClient) -> None:
1✔
20
        super().__init__(ecs_client)
1✔
21

22
    def get_tasks(self, cluster_name: str, service_name: str) -> list[str]:
1✔
23
        return paginate_aws_list(
1✔
24
            self.ecs_client,
25
            "list_tasks",
26
            "taskArns",
27
            cluster=cluster_name,
28
            serviceName=service_name,
29
        )
30

31
    def stop_task(self, cluster_name: str, task_arn: str, reason: str = "Stopped via lazy-ecs") -> bool:
1✔
32
        try:
1✔
33
            self.ecs_client.stop_task(cluster=cluster_name, task=task_arn, reason=reason)
1✔
34
            return True
1✔
35
        except Exception:
1✔
36
            return False
1✔
37

38
    def get_task_info(self, cluster_name: str, service_name: str, desired_task_def_arn: str | None) -> list[TaskInfo]:
1✔
39
        task_arns = self.get_tasks(cluster_name, service_name)
1✔
40
        if not task_arns:
1✔
41
            return []
×
42

43
        all_tasks = [
1✔
44
            task
45
            for batch in batch_items(task_arns, 100)
46
            for task in self.ecs_client.describe_tasks(cluster=cluster_name, tasks=batch).get("tasks", [])
47
        ]
48

49
        return [_create_task_info(task, desired_task_def_arn) for task in all_tasks]
1✔
50

51
    def get_task_details(
1✔
52
        self,
53
        cluster_name: str,
54
        task_arn: str,
55
        desired_task_def_arn: str | None,
56
    ) -> TaskDetails | None:
57
        result = self.get_task_and_definition(cluster_name, task_arn)
1✔
58
        if not result:
1✔
59
            return None
1✔
60

61
        task, task_definition = result
1✔
62
        is_desired_version = task["taskDefinitionArn"] == desired_task_def_arn
1✔
63
        return _build_task_details(task, task_definition, is_desired_version)
1✔
64

65
    def get_task_and_definition(
1✔
66
        self,
67
        cluster_name: str,
68
        task_arn: str,
69
    ) -> tuple[TaskTypeDef, TaskDefinitionTypeDef] | None:
70
        task_response = self.ecs_client.describe_tasks(cluster=cluster_name, tasks=[task_arn])
1✔
71
        tasks = task_response.get("tasks", [])
1✔
72
        if not tasks:
1✔
73
            return None
1✔
74

75
        task = tasks[0]
1✔
76
        task_def_arn = task["taskDefinitionArn"]
1✔
77
        task_def_response = self.ecs_client.describe_task_definition(taskDefinition=task_def_arn)
1✔
78
        task_definition = task_def_response.get("taskDefinition")
1✔
79
        if not task_definition:
1✔
80
            return None
1✔
81

82
        return task, task_definition
1✔
83

84
    def _list_tasks_by_status(self, cluster_name: str, service_name: str | None, desired_status: str) -> list[str]:
1✔
85
        """List tasks filtered by status and optional service name."""
86
        kwargs = {"cluster": cluster_name, "desiredStatus": desired_status}
1✔
87
        if service_name:
1✔
88
            kwargs["serviceName"] = service_name
1✔
89
        return paginate_aws_list(self.ecs_client, "list_tasks", "taskArns", **kwargs)
1✔
90

91
    def get_task_history(self, cluster_name: str, service_name: str | None = None) -> list[TaskHistoryDetails]:
1✔
92
        """Get task history including stopped tasks with failure information."""
93
        task_arns = []
1✔
94

95
        running_arns = self._list_tasks_by_status(cluster_name, service_name, "RUNNING")
1✔
96
        task_arns.extend(running_arns)
1✔
97

98
        stopped_arns = self._list_tasks_by_status(cluster_name, service_name, "STOPPED")
1✔
99
        task_arns.extend(stopped_arns)
1✔
100

101
        if not task_arns:
1✔
102
            return []
1✔
103

104
        all_tasks = [
1✔
105
            task
106
            for batch in batch_items(task_arns, 100)
107
            for task in self.ecs_client.describe_tasks(cluster=cluster_name, tasks=batch).get("tasks", [])
108
        ]
109

110
        return [self._parse_task_history(task) for task in all_tasks]
1✔
111

112
    def get_task_failure_analysis(self, task_history: TaskHistoryDetails) -> str:
1✔
113
        """Analyze task failure and provide human-readable explanation."""
114
        if task_history["last_status"] == "RUNNING":
1✔
115
            return "✅ Task is currently running"
1✔
116

117
        stop_code = task_history["stop_code"]
1✔
118
        stopped_reason = task_history["stopped_reason"]
1✔
119

120
        # Check container exit codes
121
        for container in task_history["containers"]:
1✔
122
            exit_code = container["exit_code"]
1✔
123
            container_reason = container["reason"]
1✔
124

125
            if exit_code is not None and exit_code != 0:
1✔
126
                return self._analyze_container_failure(
1✔
127
                    container["name"],
128
                    exit_code,
129
                    container_reason,
130
                    stop_code,
131
                    stopped_reason,
132
                )
133

134
        # No container failures, analyze task-level issues
135
        return self._analyze_task_failure(stop_code, stopped_reason)
1✔
136

137
    @staticmethod
1✔
138
    def _parse_task_history(task: TaskTypeDef) -> TaskHistoryDetails:
1✔
139
        """Parse task data into TaskHistoryDetails structure."""
140
        task_arn = task["taskArn"]
1✔
141
        task_def_arn = task["taskDefinitionArn"]
1✔
142
        task_def_family = task_def_arn.split("/")[-1].split(":")[0]
1✔
143
        task_def_revision = task_def_arn.split(":")[-1]
1✔
144

145
        containers = []
1✔
146
        for container in task.get("containers", []):
1✔
147
            containers.append(
1✔
148
                {
149
                    "name": container["name"],
150
                    "exit_code": container.get("exitCode"),
151
                    "reason": container.get("reason"),
152
                    "health_status": container.get("healthStatus"),
153
                    "last_status": container.get("lastStatus", "UNKNOWN"),
154
                },
155
            )
156

157
        return {
1✔
158
            "task_arn": task_arn,
159
            "task_definition_name": task_def_family,
160
            "task_definition_revision": task_def_revision,
161
            "last_status": task.get("lastStatus", "UNKNOWN"),
162
            "desired_status": task.get("desiredStatus", "UNKNOWN"),
163
            "stop_code": task.get("stopCode"),
164
            "stopped_reason": task.get("stoppedReason"),
165
            "created_at": task.get("createdAt"),
166
            "started_at": task.get("startedAt"),
167
            "stopped_at": task.get("stoppedAt"),
168
            "containers": containers,
169
        }
170

171
    @staticmethod
1✔
172
    def _analyze_container_failure(
1✔
173
        container_name: str,
174
        exit_code: int,
175
        container_reason: str | None,
176
        _stop_code: str | None,
177
        _stopped_reason: str | None,
178
    ) -> str:
179
        """Analyze container-level failure."""
180
        if exit_code == 137:
1✔
181
            if container_reason and "OutOfMemoryError" in container_reason:
1✔
182
                return f"🔴 Container '{container_name}' killed due to out of memory (OOM)"
1✔
183
            return f"⏰ Container '{container_name}' killed after timeout (exit code 137)"
1✔
184
        if exit_code == 139:
1✔
185
            return f"💥 Container '{container_name}' crashed with segmentation fault (exit code 139)"
1✔
186
        if exit_code == 143:
1✔
187
            return f"🛑 Container '{container_name}' gracefully stopped (SIGTERM)"
1✔
188
        if exit_code == 1:
1✔
189
            return f"❌ Container '{container_name}' application error (exit code 1)"
1✔
190
        reason_text = f" - {container_reason}" if container_reason else ""
1✔
191
        return f"🔴 Container '{container_name}' failed with exit code {exit_code}{reason_text}"
1✔
192

193
    @staticmethod
1✔
194
    def _analyze_task_failure(stop_code: str | None, stopped_reason: str | None) -> str:
1✔
195
        """Analyze task-level failure."""
196
        if not stop_code and not stopped_reason:
1✔
197
            return "✅ Task completed successfully"
1✔
198

199
        if stop_code == "TaskFailedToStart":
1✔
200
            if stopped_reason and "CannotPullContainerError" in stopped_reason:
1✔
201
                return "📦 Failed to pull container image - check image exists and permissions"
1✔
202
            if stopped_reason and "ResourcesNotAvailable" in stopped_reason:
1✔
203
                return "⚠️ Insufficient resources available to start task"
1✔
204
            reason_text = f" - {stopped_reason}" if stopped_reason else ""
1✔
205
            return f"🚫 Task failed to start{reason_text}"
1✔
206
        if stop_code == "ServiceSchedulerInitiated":
1✔
207
            return "🔄 Task stopped by service scheduler (deployment/scaling)"
1✔
208
        if stop_code == "SpotInterruption":
1✔
209
            return "💸 Task stopped due to spot instance interruption"
1✔
210
        if stop_code == "UserInitiated":
1✔
211
            return "👤 Task manually stopped by user"
1✔
UNCOV
212
        reason_text = f" - {stopped_reason}" if stopped_reason else ""
×
UNCOV
213
        code_text = f"({stop_code}) " if stop_code else ""
×
UNCOV
214
        return f"🔴 Task stopped {code_text}{reason_text}"
×
215

216

217
def _create_task_info(task: TaskTypeDef, desired_task_def_arn: str | None) -> TaskInfo:
1✔
218
    """Create task info from AWS task description."""
219
    task_arn = task["taskArn"]
1✔
220
    task_def_arn = task["taskDefinitionArn"]
1✔
221
    is_desired = task_def_arn == desired_task_def_arn
1✔
222

223
    task_id = task_arn.split("/")[-1][:8]
1✔
224
    task_def_family = task_def_arn.split("/")[-1].split(":")[0]
1✔
225
    revision = task_def_arn.split(":")[-1]
1✔
226

227
    created_at = task.get("createdAt")
1✔
228

229
    container_images = []
1✔
230
    for container in task.get("containers", []):
1✔
231
        if "image" in container:
1✔
232
            image = container["image"]
1✔
233
            if ":" in image:
1✔
UNCOV
234
                container_images.append(image.split(":")[-1])
×
235

236
    image_display = ", ".join(container_images) if container_images else "unknown"
1✔
237

238
    status_icon = "✅" if is_desired else "🔴"
1✔
239
    time_str = created_at.strftime("%H:%M:%S") if created_at else "unknown"
1✔
240

241
    display_name = f"{status_icon} v{revision} {task_def_family} ({task_id}) - {image_display} - {time_str}"
1✔
242

243
    return {
1✔
244
        "name": display_name,
245
        "value": task_arn,
246
        "task_def_arn": task_def_arn,
247
        "is_desired": is_desired,
248
        "revision": revision,
249
        "images": container_images,
250
        "created_at": created_at,
251
    }
252

253

254
def _build_task_details(
1✔
255
    task: TaskTypeDef,
256
    task_definition: TaskDefinitionTypeDef,
257
    is_desired_version: bool,
258
) -> TaskDetails:
259
    """Build comprehensive task details dictionary."""
260
    task_arn = task["taskArn"]
1✔
261
    task_def_arn = task["taskDefinitionArn"]
1✔
262
    task_def_family = task_def_arn.split("/")[-1].split(":")[0]
1✔
263
    task_def_revision = task_def_arn.split(":")[-1]
1✔
264

265
    containers = []
1✔
266
    for container_def in task_definition["containerDefinitions"]:
1✔
267
        container_info = {
1✔
268
            "name": container_def["name"],
269
            "image": container_def["image"],
270
            "cpu": container_def.get("cpu"),
271
            "memory": container_def.get("memory"),
272
            "memoryReservation": container_def.get("memoryReservation"),
273
        }
274
        containers.append(container_info)
1✔
275

276
    return {
1✔
277
        "task_arn": task_arn,
278
        "task_definition_name": task_def_family,
279
        "task_definition_revision": task_def_revision,
280
        "is_desired_version": is_desired_version,
281
        "task_status": task.get("lastStatus", "UNKNOWN"),
282
        "containers": containers,
283
        "created_at": task.get("createdAt"),
284
        "started_at": task.get("startedAt"),
285
    }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc