• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vertti / lazy-ecs / 20895791057

11 Jan 2026 01:18PM UTC coverage: 89.95% (-0.03%) from 89.981%
20895791057

Pull #106

github

web-flow
Merge a168470a1 into 98985b196
Pull Request #106: feat: add stop task functionality

23 of 26 new or added lines in 4 files covered. (88.46%)

10 existing lines in 1 file now uncovered.

1441 of 1602 relevant lines covered (89.95%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.1
/src/lazy_ecs/features/task/task.py
1
"""Task operations for ECS."""
2

3
from __future__ import annotations
1✔
4

5
from typing import TYPE_CHECKING
1✔
6

7
from botocore.exceptions import BotoCoreError, ClientError
1✔
8

9
from ...core.base import BaseAWSService
1✔
10
from ...core.types import TaskDetails, TaskHistoryDetails, TaskInfo
1✔
11
from ...core.utils import batch_items, paginate_aws_list
1✔
12

13
if TYPE_CHECKING:
14
    from mypy_boto3_ecs.client import ECSClient
15
    from mypy_boto3_ecs.type_defs import TaskDefinitionTypeDef, TaskTypeDef
16

17

18
class TaskService(BaseAWSService):
1✔
19
    """Service for ECS task operations."""
20

21
    def __init__(self, ecs_client: ECSClient) -> None:
1✔
22
        super().__init__(ecs_client)
1✔
23

24
    def get_tasks(self, cluster_name: str, service_name: str) -> list[str]:
1✔
25
        return paginate_aws_list(
1✔
26
            self.ecs_client,
27
            "list_tasks",
28
            "taskArns",
29
            cluster=cluster_name,
30
            serviceName=service_name,
31
        )
32

33
    def stop_task(
1✔
34
        self, cluster_name: str, task_arn: str, reason: str = "Stopped via lazy-ecs"
35
    ) -> tuple[bool, str | None]:
36
        try:
1✔
37
            self.ecs_client.stop_task(cluster=cluster_name, task=task_arn, reason=reason)
1✔
38
            return True, None
1✔
39
        except ClientError as e:
1✔
40
            return False, e.response.get("Error", {}).get("Message", str(e))
1✔
NEW
41
        except BotoCoreError as e:
×
NEW
42
            return False, str(e)
×
43

44
    def get_task_info(self, cluster_name: str, service_name: str, desired_task_def_arn: str | None) -> list[TaskInfo]:
1✔
45
        task_arns = self.get_tasks(cluster_name, service_name)
1✔
46
        if not task_arns:
1✔
47
            return []
×
48

49
        all_tasks = [
1✔
50
            task
51
            for batch in batch_items(task_arns, 100)
52
            for task in self.ecs_client.describe_tasks(cluster=cluster_name, tasks=batch).get("tasks", [])
53
        ]
54

55
        return [_create_task_info(task, desired_task_def_arn) for task in all_tasks]
1✔
56

57
    def get_task_details(
1✔
58
        self,
59
        cluster_name: str,
60
        task_arn: str,
61
        desired_task_def_arn: str | None,
62
    ) -> TaskDetails | None:
63
        result = self.get_task_and_definition(cluster_name, task_arn)
1✔
64
        if not result:
1✔
65
            return None
1✔
66

67
        task, task_definition = result
1✔
68
        is_desired_version = task["taskDefinitionArn"] == desired_task_def_arn
1✔
69
        return _build_task_details(task, task_definition, is_desired_version)
1✔
70

71
    def get_task_and_definition(
1✔
72
        self,
73
        cluster_name: str,
74
        task_arn: str,
75
    ) -> tuple[TaskTypeDef, TaskDefinitionTypeDef] | None:
76
        task_response = self.ecs_client.describe_tasks(cluster=cluster_name, tasks=[task_arn])
1✔
77
        tasks = task_response.get("tasks", [])
1✔
78
        if not tasks:
1✔
79
            return None
1✔
80

81
        task = tasks[0]
1✔
82
        task_def_arn = task["taskDefinitionArn"]
1✔
83
        task_def_response = self.ecs_client.describe_task_definition(taskDefinition=task_def_arn)
1✔
84
        task_definition = task_def_response.get("taskDefinition")
1✔
85
        if not task_definition:
1✔
86
            return None
1✔
87

88
        return task, task_definition
1✔
89

90
    def _list_tasks_by_status(self, cluster_name: str, service_name: str | None, desired_status: str) -> list[str]:
1✔
91
        """List tasks filtered by status and optional service name."""
92
        kwargs = {"cluster": cluster_name, "desiredStatus": desired_status}
1✔
93
        if service_name:
1✔
94
            kwargs["serviceName"] = service_name
1✔
95
        return paginate_aws_list(self.ecs_client, "list_tasks", "taskArns", **kwargs)
1✔
96

97
    def get_task_history(self, cluster_name: str, service_name: str | None = None) -> list[TaskHistoryDetails]:
1✔
98
        """Get task history including stopped tasks with failure information."""
99
        task_arns = []
1✔
100

101
        running_arns = self._list_tasks_by_status(cluster_name, service_name, "RUNNING")
1✔
102
        task_arns.extend(running_arns)
1✔
103

104
        stopped_arns = self._list_tasks_by_status(cluster_name, service_name, "STOPPED")
1✔
105
        task_arns.extend(stopped_arns)
1✔
106

107
        if not task_arns:
1✔
108
            return []
1✔
109

110
        all_tasks = [
1✔
111
            task
112
            for batch in batch_items(task_arns, 100)
113
            for task in self.ecs_client.describe_tasks(cluster=cluster_name, tasks=batch).get("tasks", [])
114
        ]
115

116
        return [self._parse_task_history(task) for task in all_tasks]
1✔
117

118
    def get_task_failure_analysis(self, task_history: TaskHistoryDetails) -> str:
1✔
119
        """Analyze task failure and provide human-readable explanation."""
120
        if task_history["last_status"] == "RUNNING":
1✔
121
            return "✅ Task is currently running"
1✔
122

123
        stop_code = task_history["stop_code"]
1✔
124
        stopped_reason = task_history["stopped_reason"]
1✔
125

126
        # Check container exit codes
127
        for container in task_history["containers"]:
1✔
128
            exit_code = container["exit_code"]
1✔
129
            container_reason = container["reason"]
1✔
130

131
            if exit_code is not None and exit_code != 0:
1✔
132
                return self._analyze_container_failure(
1✔
133
                    container["name"],
134
                    exit_code,
135
                    container_reason,
136
                    stop_code,
137
                    stopped_reason,
138
                )
139

140
        # No container failures, analyze task-level issues
141
        return self._analyze_task_failure(stop_code, stopped_reason)
1✔
142

143
    @staticmethod
1✔
144
    def _parse_task_history(task: TaskTypeDef) -> TaskHistoryDetails:
1✔
145
        """Parse task data into TaskHistoryDetails structure."""
146
        task_arn = task["taskArn"]
1✔
147
        task_def_arn = task["taskDefinitionArn"]
1✔
148
        task_def_family = task_def_arn.split("/")[-1].split(":")[0]
1✔
149
        task_def_revision = task_def_arn.split(":")[-1]
1✔
150

151
        containers = []
1✔
152
        for container in task.get("containers", []):
1✔
153
            containers.append(
1✔
154
                {
155
                    "name": container["name"],
156
                    "exit_code": container.get("exitCode"),
157
                    "reason": container.get("reason"),
158
                    "health_status": container.get("healthStatus"),
159
                    "last_status": container.get("lastStatus", "UNKNOWN"),
160
                },
161
            )
162

163
        return {
1✔
164
            "task_arn": task_arn,
165
            "task_definition_name": task_def_family,
166
            "task_definition_revision": task_def_revision,
167
            "last_status": task.get("lastStatus", "UNKNOWN"),
168
            "desired_status": task.get("desiredStatus", "UNKNOWN"),
169
            "stop_code": task.get("stopCode"),
170
            "stopped_reason": task.get("stoppedReason"),
171
            "created_at": task.get("createdAt"),
172
            "started_at": task.get("startedAt"),
173
            "stopped_at": task.get("stoppedAt"),
174
            "containers": containers,
175
        }
176

177
    @staticmethod
1✔
178
    def _analyze_container_failure(
1✔
179
        container_name: str,
180
        exit_code: int,
181
        container_reason: str | None,
182
        _stop_code: str | None,
183
        _stopped_reason: str | None,
184
    ) -> str:
185
        """Analyze container-level failure."""
186
        if exit_code == 137:
1✔
187
            if container_reason and "OutOfMemoryError" in container_reason:
1✔
188
                return f"🔴 Container '{container_name}' killed due to out of memory (OOM)"
1✔
189
            return f"⏰ Container '{container_name}' killed after timeout (exit code 137)"
1✔
190
        if exit_code == 139:
1✔
191
            return f"💥 Container '{container_name}' crashed with segmentation fault (exit code 139)"
1✔
192
        if exit_code == 143:
1✔
193
            return f"🛑 Container '{container_name}' gracefully stopped (SIGTERM)"
1✔
194
        if exit_code == 1:
1✔
195
            return f"❌ Container '{container_name}' application error (exit code 1)"
1✔
196
        reason_text = f" - {container_reason}" if container_reason else ""
1✔
197
        return f"🔴 Container '{container_name}' failed with exit code {exit_code}{reason_text}"
1✔
198

199
    @staticmethod
1✔
200
    def _analyze_task_failure(stop_code: str | None, stopped_reason: str | None) -> str:
1✔
201
        """Analyze task-level failure."""
202
        if not stop_code and not stopped_reason:
1✔
203
            return "✅ Task completed successfully"
1✔
204

205
        if stop_code == "TaskFailedToStart":
1✔
206
            if stopped_reason and "CannotPullContainerError" in stopped_reason:
1✔
207
                return "📦 Failed to pull container image - check image exists and permissions"
1✔
208
            if stopped_reason and "ResourcesNotAvailable" in stopped_reason:
1✔
209
                return "⚠️ Insufficient resources available to start task"
1✔
210
            reason_text = f" - {stopped_reason}" if stopped_reason else ""
1✔
211
            return f"🚫 Task failed to start{reason_text}"
1✔
212
        if stop_code == "ServiceSchedulerInitiated":
1✔
213
            return "🔄 Task stopped by service scheduler (deployment/scaling)"
1✔
214
        if stop_code == "SpotInterruption":
1✔
215
            return "💸 Task stopped due to spot instance interruption"
1✔
216
        if stop_code == "UserInitiated":
1✔
217
            return "👤 Task manually stopped by user"
1✔
218
        reason_text = f" - {stopped_reason}" if stopped_reason else ""
×
219
        code_text = f"({stop_code}) " if stop_code else ""
×
220
        return f"🔴 Task stopped {code_text}{reason_text}"
×
221

222

223
def _create_task_info(task: TaskTypeDef, desired_task_def_arn: str | None) -> TaskInfo:
1✔
224
    """Create task info from AWS task description."""
225
    task_arn = task["taskArn"]
1✔
226
    task_def_arn = task["taskDefinitionArn"]
1✔
227
    is_desired = task_def_arn == desired_task_def_arn
1✔
228

229
    task_id = task_arn.split("/")[-1][:8]
1✔
230
    task_def_family = task_def_arn.split("/")[-1].split(":")[0]
1✔
231
    revision = task_def_arn.split(":")[-1]
1✔
232

233
    created_at = task.get("createdAt")
1✔
234

235
    container_images = []
1✔
236
    for container in task.get("containers", []):
1✔
237
        if "image" in container:
1✔
238
            image = container["image"]
1✔
239
            if ":" in image:
1✔
240
                container_images.append(image.split(":")[-1])
×
241

242
    image_display = ", ".join(container_images) if container_images else "unknown"
1✔
243

244
    status_icon = "✅" if is_desired else "🔴"
1✔
245
    time_str = created_at.strftime("%H:%M:%S") if created_at else "unknown"
1✔
246

247
    display_name = f"{status_icon} v{revision} {task_def_family} ({task_id}) - {image_display} - {time_str}"
1✔
248

249
    return {
1✔
250
        "name": display_name,
251
        "value": task_arn,
252
        "task_def_arn": task_def_arn,
253
        "is_desired": is_desired,
254
        "revision": revision,
255
        "images": container_images,
256
        "created_at": created_at,
257
    }
258

259

260
def _build_task_details(
1✔
261
    task: TaskTypeDef,
262
    task_definition: TaskDefinitionTypeDef,
263
    is_desired_version: bool,
264
) -> TaskDetails:
265
    """Build comprehensive task details dictionary."""
266
    task_arn = task["taskArn"]
1✔
267
    task_def_arn = task["taskDefinitionArn"]
1✔
268
    task_def_family = task_def_arn.split("/")[-1].split(":")[0]
1✔
269
    task_def_revision = task_def_arn.split(":")[-1]
1✔
270

271
    containers = []
1✔
272
    for container_def in task_definition["containerDefinitions"]:
1✔
273
        container_info = {
1✔
274
            "name": container_def["name"],
275
            "image": container_def["image"],
276
            "cpu": container_def.get("cpu"),
277
            "memory": container_def.get("memory"),
278
            "memoryReservation": container_def.get("memoryReservation"),
279
        }
280
        containers.append(container_info)
1✔
281

282
    return {
1✔
283
        "task_arn": task_arn,
284
        "task_definition_name": task_def_family,
285
        "task_definition_revision": task_def_revision,
286
        "is_desired_version": is_desired_version,
287
        "task_status": task.get("lastStatus", "UNKNOWN"),
288
        "containers": containers,
289
        "created_at": task.get("createdAt"),
290
        "started_at": task.get("startedAt"),
291
    }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc