• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

IBM / unitxt / 13183653856

06 Feb 2025 04:43PM UTC coverage: 78.9% (-0.4%) from 79.34%
13183653856

Pull #1578

github

web-flow
Merge 42770bd59 into 9844fe0ef
Pull Request #1578: Wml comp

1454 of 1833 branches covered (79.32%)

Branch coverage included in aggregate %.

9192 of 11660 relevant lines covered (78.83%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.42
src/unitxt/image_operators.py
1
import base64
1✔
2
import io
1✔
3
import re
1✔
4
from abc import abstractmethod
1✔
5
from typing import Any, Dict, Tuple
1✔
6

7
import numpy as np
1✔
8
from datasets import Image as DatasetsImage
1✔
9

10
from .augmentors import TaskInputsAugmentor
1✔
11
from .dict_utils import dict_get
1✔
12
from .operator import PackageRequirementsMixin
1✔
13
from .operators import FieldOperator, InstanceFieldOperator
1✔
14
from .settings_utils import get_constants
1✔
15
from .type_utils import isoftype
1✔
16
from .types import Image
1✔
17

18
constants = get_constants()
1✔
19

20
datasets_image = DatasetsImage()
1✔
21

22

23
def _image_to_bytes(image, format="JPEG"):
1✔
24
    import base64
1✔
25

26
    with io.BytesIO() as buffer:
1✔
27
        image.save(buffer, format=format)
1✔
28
        return base64.b64encode(buffer.getvalue()).decode("utf-8")
1✔
29

30

31
class ImageDataString(str):
1✔
32
    def __repr__(self) -> str:
1✔
33
        if len(self) > 30:
×
34
            return '<ImageDataString "' + self[:30] + '...">'
×
35
        return super().__repr__()
×
36

37

38
def image_to_data_url(image: Image, default_format="JPEG"):
1✔
39
    """Convert an image to a data URL.
40

41
    https://developer.mozilla.org/en-US/docs/Web/URI/Schemes/data
42
    """
43
    image_format = image["format"] if image["format"] else default_format
1✔
44
    base64_image = _image_to_bytes(
1✔
45
        image["image"].convert("RGB"), format=image_format.upper()
46
    )
47
    return ImageDataString(f"data:image/{image_format.lower()};base64,{base64_image}")
1✔
48

49

50
def _bytes_to_image(b64_string):
1✔
51
    import base64
×
52
    import io
×
53

54
    from PIL import Image
×
55

56
    # Decode the base64-encoded string
57
    decoded_bytes = base64.b64decode(b64_string)
×
58
    # Open the image from the decoded bytes
59
    return Image.open(io.BytesIO(decoded_bytes))
×
60

61

62
def data_url_to_image(data_url: str):
1✔
63
    import re
×
64

65
    # Verify that the string is a data URL
66
    if not data_url.startswith("data:"):
×
67
        raise ValueError("Invalid data URL")
×
68

69
    # Extract the base64 data using a regular expression
70
    match = re.match(r"data:image/(.*?);base64,(.*)", data_url)
×
71
    if not match:
×
72
        raise ValueError("Invalid data URL format")
×
73

74
    # Extract image format and base64 data
75
    image_format, b64_data = match.groups()
×
76

77
    # Use _bytes_to_image to convert base64 data to an image
78
    return _bytes_to_image(b64_data)
×
79

80

81
class PillowMixin(PackageRequirementsMixin):
1✔
82
    _requirements_list = {"PIL": "pip install pillow"}
1✔
83

84
    def prepare(self):
1✔
85
        super().prepare()
1✔
86
        import PIL
1✔
87
        from PIL import Image, ImageEnhance, ImageFilter
1✔
88

89
        self.pil = PIL
1✔
90
        self.image = Image
1✔
91
        self.enhance = ImageEnhance
1✔
92
        self.filter = ImageFilter
1✔
93

94

95
def extract_images(instance):
1✔
96
    regex = r"<" + f"{constants.image_tag}" + r'\s+src=["\'](.*?)["\']'
1✔
97
    image_sources = re.findall(regex, instance["source"])
1✔
98
    images = []
1✔
99
    for image_source in image_sources:
1✔
100
        image = dict_get(instance, image_source)
1✔
101
        images.append(image)
1✔
102
    return images
1✔
103

104

105
class EncodeImageToString(FieldOperator):
1✔
106
    image_format: str = "JPEG"
1✔
107

108
    def encode_image_to_base64(self, image):
1✔
109
        buffer = io.BytesIO()
×
110
        image.save(buffer, format=self.image_format)
×
111
        return ImageDataString(base64.b64encode(buffer.getvalue()).decode("utf-8"))
×
112

113
    def process_value(self, value: Any) -> Any:
1✔
114
        return {"image": self.encode_image_to_base64(value)}
×
115

116

117
class DecodeImage(FieldOperator, PillowMixin):
1✔
118
    def process_value(self, value: str) -> Any:
1✔
119
        image_data = base64.b64decode(value)
×
120
        return self.image.open(io.BytesIO(image_data))
×
121

122

123
class ToImage(InstanceFieldOperator):
1✔
124
    def process_instance_value(self, value: Any, instance: Dict[str, Any]) -> Image:
1✔
125
        return {
1✔
126
            "image": value,
127
            "format": value.format if value.format is not None else "JPEG",
128
        }
129

130

131
class ImageFieldOperator(FieldOperator, PillowMixin):
1✔
132
    @abstractmethod
1✔
133
    def process_image(self, image: Any):
1✔
134
        pass
×
135

136
    def process_value(self, value: Image) -> Any:
1✔
137
        if not isinstance(value["image"], self.image.Image):
×
138
            raise ValueError(f"ImageFieldOperator requires image, got {type(value)}.")
×
139
        value["image"] = self.process_image(value["image"])
×
140
        return value
×
141

142

143
class ImageAugmentor(TaskInputsAugmentor, PillowMixin):
1✔
144
    augmented_type: object = Image
1✔
145

146
    @abstractmethod
1✔
147
    def process_image(self, image: Any):
1✔
148
        pass
×
149

150
    def process_value(self, value: Image) -> Any:
1✔
151
        if not isoftype(value, Image):
1✔
152
            return value
1✔
153
        value["image"] = self.process_image(value["image"])
×
154
        return value
×
155

156

157
class GrayScale(ImageAugmentor):
1✔
158
    def process_image(self, image):
1✔
159
        # Convert the image to grayscale
160
        grayscale_image = image.convert("L")
×
161

162
        # Convert the grayscale image to a NumPy array
163
        grayscale_array = np.array(grayscale_image)
×
164

165
        # Add a dummy channel dimension to make it (height, width, 1)
166
        grayscale_array = np.expand_dims(grayscale_array, axis=-1)
×
167

168
        # Repeat the channel to have (height, width, 3) if needed for compatibility
169
        grayscale_array = np.repeat(grayscale_array, 3, axis=-1)
×
170

171
        # Convert back to a PIL image with 3 channels
172
        return self.image.fromarray(grayscale_array)
×
173

174

175
class GridLines(ImageAugmentor):
1✔
176
    """A class that overlays a fixed number of evenly spaced horizontal and vertical lines on an image.
177

178
    Args:
179
        num_lines (int):
180
            The number of horizontal and vertical lines to add.
181
        line_thickness (int):
182
            Thickness of each line in pixels.
183
        line_color (Tuple[int, int, int]):
184
            RGB color of the grid lines.
185

186
    Methods:
187
        process_image(image): Adds grid lines to the provided image and returns the modified image.
188
    """
189

190
    num_lines: int = 128
1✔
191
    line_thickness: int = 1
1✔
192
    line_color: Tuple[int, int, int] = (255, 255, 255)
1✔
193

194
    def process_image(self, image):
1✔
195
        image_array = np.array(image)
×
196

197
        # Determine image dimensions
198
        height, width, _ = image_array.shape
×
199

200
        # Calculate spacing for the lines based on image size and number of lines
201
        horizontal_spacing = height // (self.num_lines + 1)
×
202
        vertical_spacing = width // (self.num_lines + 1)
×
203

204
        # Add horizontal lines
205
        for i in range(1, self.num_lines + 1):
×
206
            y = i * horizontal_spacing
×
207
            image_array[y : y + self.line_thickness, :, :] = self.line_color
×
208

209
        # Add vertical lines
210
        for i in range(1, self.num_lines + 1):
×
211
            x = i * vertical_spacing
×
212
            image_array[:, x : x + self.line_thickness, :] = self.line_color
×
213

214
        # Convert back to a PIL image
215
        return self.image.fromarray(image_array)
×
216

217

218
class PixelNoise(ImageAugmentor):
1✔
219
    """A class that overlays a mask of randomly colored nxn squares across an image based on a specified noise rate.
220

221
    Args:
222
        square_size (int):
223
            Size of each square in pixels.
224
        noise_rate (float):
225
            Proportion of the image that should be affected by noise (0 to 1).
226

227
    Methods:
228
        process_image(image):
229
            Adds the random square mask to the provided image and returns the modified image.
230
    """
231

232
    square_size: int = 1
1✔
233
    noise_rate: float = 0.3  # Percentage of squares to be randomly colored
1✔
234

235
    def process_image(self, image):
1✔
236
        image_array = np.array(image)
×
237
        height, width, channels = image_array.shape
×
238

239
        # Calculate grid dimensions
240
        y_squares = height // self.square_size
×
241
        x_squares = width // self.square_size
×
242

243
        # Create a grid indicating where to apply the mask
244
        noise_mask = np.random.rand(y_squares, x_squares) < self.noise_rate
×
245

246
        # Generate random colors for each square
247
        colors = np.random.randint(
×
248
            0, 256, (y_squares, x_squares, channels), dtype=np.uint8
249
        )
250

251
        # Expand the mask and colors to the size of the image array
252
        mask_expanded = np.repeat(
×
253
            np.repeat(noise_mask, self.square_size, axis=0), self.square_size, axis=1
254
        )
255
        colors_expanded = np.repeat(
×
256
            np.repeat(colors, self.square_size, axis=0), self.square_size, axis=1
257
        )
258

259
        # Reshape `mask_expanded` to add the color channel dimension
260
        mask_expanded = np.repeat(mask_expanded[:, :, np.newaxis], channels, axis=2)
×
261

262
        # Apply colors where the mask is true using element-wise assignment
263
        image_array = np.where(mask_expanded, colors_expanded, image_array)
×
264

265
        # Convert back to a PIL image
266
        return self.image.fromarray(image_array)
×
267

268

269
class Oldify(ImageAugmentor):
1✔
270
    noise_strength: int = 30
1✔
271
    tint_strength: float = 0.4  # Percentage of squares to be randomly colored
1✔
272

273
    def process_image(self, image):
1✔
274
        # Convert to a numpy array for manipulation
275
        image_array = np.array(image)
×
276

277
        # Step 1: Add a slight yellowish tint
278
        yellow_tint = np.array([255, 228, 170], dtype=np.uint8)  # Aged paper-like color
×
279
        tinted_image_array = (
×
280
            image_array * (1 - self.tint_strength) + yellow_tint * self.tint_strength
281
        ).astype(np.uint8)
282

283
        # Step 2: Add noise for a "film grain" effect
284
        noise = np.random.normal(0, self.noise_strength, image_array.shape).astype(
×
285
            np.int16
286
        )
287
        noisy_image_array = np.clip(tinted_image_array + noise, 0, 255).astype(np.uint8)
×
288

289
        # Step 3: Convert back to a PIL Image for additional processing
290
        old_image = self.image.fromarray(noisy_image_array)
×
291

292
        # Step 4: Apply a slight blur to mimic an older lens or slight wear
293
        old_image = old_image.filter(self.filter.GaussianBlur(radius=1))
×
294

295
        # Step 5: Adjust contrast and brightness to give it a "faded" look
296
        enhancer = self.enhance.Contrast(old_image)
×
297
        old_image = enhancer.enhance(0.6)  # Lower contrast
×
298

299
        enhancer = self.enhance.Brightness(old_image)
×
300
        return enhancer.enhance(1.2)  # Slightly increased brightness
×
301

302

303
class ToRGB(ImageFieldOperator):
1✔
304
    def process_image(self, image):
1✔
305
        return image.convert("RGB")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc