• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/bsp/spec/task.py
1
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
UNCOV
3
from __future__ import annotations
×
4

UNCOV
5
from dataclasses import dataclass
×
UNCOV
6
from enum import Enum
×
UNCOV
7
from typing import Any
×
8

UNCOV
9
from pants.bsp.spec.base import StatusCode, TaskId
×
UNCOV
10
from pants.bsp.spec.compile import CompileReport, CompileTask
×
UNCOV
11
from pants.bsp.spec.notification import BSPNotification
×
12

13
# -----------------------------------------------------------------------------------------------
14
# Task Notifications
15
# See https://build-server-protocol.github.io/docs/specification.html#compile-request
16
# -----------------------------------------------------------------------------------------------
17

18

UNCOV
19
class TaskDataKind(Enum):
×
20
    # `data` field must contain a CompileTask object.
UNCOV
21
    COMPILE_TASK = "compile-task"
×
22

23
    #  `data` field must contain a CompileReport object.
UNCOV
24
    COMPILE_REPORT = "compile-report"
×
25

26
    # `data` field must contain a TestTask object.
UNCOV
27
    TEST_TASK = "test-task"
×
28

29
    # `data` field must contain a TestReport object.
UNCOV
30
    TEST_REPORT = "test-report"
×
31

32
    # `data` field must contain a TestStart object.
UNCOV
33
    TEST_START = "test-start"
×
34

35
    # `data` field must contain a TestFinish object.
UNCOV
36
    TEST_FINISH = "test-finish"
×
37

38

UNCOV
39
@dataclass(frozen=True)
×
UNCOV
40
class TaskStartParams(BSPNotification):
×
UNCOV
41
    notification_name = "build/taskStart"
×
42

43
    # Unique id of the task with optional reference to parent task id
UNCOV
44
    task_id: TaskId
×
45

46
    # Timestamp of when the event started in milliseconds since Epoch.
UNCOV
47
    event_time: int | None = None
×
48

49
    # Message describing the task.
UNCOV
50
    message: str | None = None
×
51

52
    # Task-specific data.
53
    # Note: This field is serialized as two fields: `dataKind` (for type name) and `data`.
UNCOV
54
    data: CompileTask | None = None
×
55

UNCOV
56
    def to_json_dict(self) -> dict[str, Any]:
×
57
        result: dict[str, Any] = {"taskId": self.task_id.to_json_dict()}
×
58
        if self.event_time is not None:
×
59
            result["eventTime"] = self.event_time
×
60
        if self.message is not None:
×
61
            result["message"] = self.message
×
62
        if self.data is not None:
×
63
            if isinstance(self.data, CompileTask):
×
64
                result["dataKind"] = TaskDataKind.COMPILE_TASK.value
×
65
            else:
66
                raise AssertionError(
×
67
                    f"TaskStartParams contained an unexpected instance: {self.data}"
68
                )
69
            result["data"] = self.data.to_json_dict()
×
70
        return result
×
71

72

UNCOV
73
@dataclass(frozen=True)
×
UNCOV
74
class TaskProgressParams(BSPNotification):
×
UNCOV
75
    notification_name = "build/taskProgress"
×
76

77
    # Unique id of the task with optional reference to parent task id
UNCOV
78
    task_id: TaskId
×
79

80
    # Timestamp of when the progress event was generated in milliseconds since Epoch.
UNCOV
81
    event_time: int | None = None
×
82

83
    # Message describing the task progress.
84
    # Information about the state of the task at the time the event is sent.
UNCOV
85
    message: str | None = None
×
86

87
    # If known, total amount of work units in this task.
UNCOV
88
    total: int | None = None
×
89

90
    # If known, completed amount of work units in this task.
UNCOV
91
    progress: int | None = None
×
92

93
    # Name of a work unit. For example, "files" or "tests". May be empty.
UNCOV
94
    unit: str | None = None
×
95

96
    # TODO: `data` field is not currently represented. Once we know what types will be sent, then it can be bound.
97

UNCOV
98
    def to_json_dict(self) -> dict[str, Any]:
×
99
        result: dict[str, Any] = {"taskId": self.task_id.to_json_dict()}
×
100
        if self.event_time is not None:
×
101
            result["eventTime"] = self.event_time
×
102
        if self.message is not None:
×
103
            result["message"] = self.message
×
104
        if self.total is not None:
×
105
            result["total"] = self.total
×
106
        if self.progress is not None:
×
107
            result["progress"] = self.progress
×
108
        if self.unit is not None:
×
109
            result["unit"] = self.unit
×
110
        return result
×
111

112

UNCOV
113
@dataclass(frozen=True)
×
UNCOV
114
class TaskFinishParams(BSPNotification):
×
UNCOV
115
    notification_name = "build/taskFinish"
×
116

117
    # Unique id of the task with optional reference to parent task id
UNCOV
118
    task_id: TaskId
×
119

120
    # Timestamp of the event in milliseconds.
UNCOV
121
    event_time: int | None = None
×
122

123
    # Message describing the finish event.
UNCOV
124
    message: str | None = None
×
125

126
    # Task completion status.
UNCOV
127
    status: StatusCode = StatusCode.OK
×
128

129
    # Task-specific data.
130
    # Note: This field is serialized as two fields: `dataKind` (for type name) and `data`.
UNCOV
131
    data: CompileReport | None = None
×
132

UNCOV
133
    def to_json_dict(self) -> dict[str, Any]:
×
134
        result: dict[str, Any] = {
×
135
            "taskId": self.task_id.to_json_dict(),
136
            "status": self.status.value,
137
        }
138
        if self.event_time is not None:
×
139
            result["eventTime"] = self.event_time
×
140
        if self.message is not None:
×
141
            result["message"] = self.message
×
142
        if self.data is not None:
×
143
            if isinstance(self.data, CompileReport):
×
144
                result["dataKind"] = TaskDataKind.COMPILE_REPORT.value
×
145
            else:
146
                raise AssertionError(
×
147
                    f"TaskFinishParams contained an unexpected instance: {self.data}"
148
                )
149
            result["data"] = self.data.to_json_dict()
×
150
        return result
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc