• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

google / sedpack / 12117392047

02 Dec 2024 10:28AM UTC coverage: 86.849% (-0.2%) from 87.028%
12117392047

Pull #70

github

web-flow
Merge 5653b6fcd into c6d1a1555
Pull Request #70: Bump PyO3 version to support Python3.13

2351 of 2707 relevant lines covered (86.85%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.87
/src/sedpack/io/flatbuffer/iterate.py
1
# Copyright 2024 Google LLC
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""Iterate a FlatBuffers shard. See src/sedpack/io/flatbuffer/shard.fbs
15
sedpack.io.shard.shard_writer_flatbuffer.ShardWriterFlatBuffer for more
16
information how it is saved.
17
"""
18

19
from pathlib import Path
1✔
20
from typing import Iterable
1✔
21

22
import aiofiles
1✔
23
import numpy as np
1✔
24

25
from sedpack.io.compress import CompressedFile
1✔
26
from sedpack.io.metadata import Attribute
1✔
27
from sedpack.io.types import ExampleT
1✔
28
from sedpack.io.shard import IterateShardBase
1✔
29
from sedpack.io.shard.iterate_shard_base import T
1✔
30
from sedpack.io.utils import func_or_identity
1✔
31

32
# Autogenerated from src/sedpack/io/flatbuffer/shard.fbs
33
import sedpack.io.flatbuffer.shardfile.Example as fbapi_Example
1✔
34
import sedpack.io.flatbuffer.shardfile.Shard as fbapi_Shard
1✔
35

36

37
class IterateShardFlatBuffer(IterateShardBase[T]):
1✔
38
    """Remember everything to be able to iterate shards. This can be pickled
39
    and passed as a callable object into another process.
40
    """
41

42
    def _iterate_content(self, content: bytes) -> Iterable[ExampleT]:
1✔
43
        shard = fbapi_Shard.Shard.GetRootAs(content, 0)
1✔
44

45
        for example_id in range(shard.ExamplesLength()):
1✔
46
            example: fbapi_Example.Example = shard.Examples(example_id)
1✔
47

48
            example_dictionary: ExampleT = {}
1✔
49

50
            for attribute_id, attribute in enumerate(
1✔
51
                    self.dataset_structure.saved_data_description):
52
                # No-copy fast retrieval, represented as bytes.
53
                # This is a manually written method which uses the fact
54
                # that we know what dtype to decode. It might be cleaner to do
55
                # this using a union. There are two caveats:
56
                # - FlatBuffers only support a subset of types we care about
57
                #   (e.g., float16 which is not included in
58
                #   flatbuffers/python/flatbuffers/number_types.py).
59
                # - Speed, since we first need to check the type for every
60
                #   attribute.
61
                # Bytearray representation. Little endian, just loaded.
62
                np_bytes = example.Attributes(
1✔
63
                    attribute_id).AttributeBytesAsNumpy()
64

65
                np_array = IterateShardFlatBuffer.decode_array(
1✔
66
                    np_bytes=np_bytes,
67
                    attribute=attribute,
68
                )
69

70
                # Copy otherwise the arrays are immutable and keep the whole
71
                # file content from being garbage collected.
72
                np_array = np.copy(np_array)
1✔
73

74
                example_dictionary[attribute.name] = np_array
1✔
75

76
            yield example_dictionary
1✔
77

78
    @staticmethod
1✔
79
    def decode_array(np_bytes: np.ndarray,
1✔
80
                     attribute: Attribute,
81
                     batch_size: int = 0) -> np.ndarray:
82
        """Decode an array. See `sedpack.io.shard.shard_writer_flatbuffer
83
        .ShardWriterFlatBuffer.save_numpy_vector_as_bytearray`
84
        for format description. The code tries to avoid unnecessary copies.
85

86
        Args:
87

88
          np_bytes (np.ndarray): The bytes as an np.array of bytes.
89

90
          attribute (Attribute): Description of the final array (dtype and
91
          shape).
92

93
          batch_size (int): If `batch_size` is larger than zero we received a
94
          batch of these attributes. In case when `batch_size == -1` the
95
          `np.reshape` auto-deduces the dimension. Otherwise we received
96
          exactly one value of this attribute.
97

98
        Returns: the parsed np.ndarray of the correct dtype and shape.
99
        """
100
        dt = np.dtype(attribute.dtype)
1✔
101
        # FlatBuffers are little-endian. There is no byteswap by
102
        # `np.frombuffer` but the array will be interpreted correctly.
103
        dt = dt.newbyteorder("<")
1✔
104
        np_array = np.frombuffer(
1✔
105
            buffer=np_bytes,  # a view into the buffer, not a copy
106
            dtype=dt,
107
        )
108

109
        # Reshape if needed.
110
        if batch_size > 0 or batch_size == -1:
1✔
111
            np_array = np_array.reshape((batch_size, *attribute.shape))
×
112
        else:
113
            np_array = np_array.reshape(attribute.shape)
1✔
114

115
        return np_array
1✔
116

117
    def iterate_shard(self, file_path: Path) -> Iterable[ExampleT]:
1✔
118
        """Iterate a shard.
119
        """
120
        # Read then decompress (nice for benchmarking).
121
        with open(file_path, "rb") as f:
1✔
122
            content = f.read()
1✔
123
        content = CompressedFile(
1✔
124
            self.dataset_structure.compression).decompress(content)
125
        yield from self._iterate_content(content=content)
1✔
126

127
    async def iterate_shard_async(self, file_path: Path):
1✔
128
        """Asynchronously iterate a shard.
129
        """
130
        async with aiofiles.open(file_path, "rb") as f:
1✔
131
            content = await f.read()
1✔
132
            content = CompressedFile(
1✔
133
                self.dataset_structure.compression).decompress(content)
134

135
        for example in self._iterate_content(content=content):
1✔
136
            yield example
1✔
137

138
    def process_and_list(self, shard_file: Path) -> list[T]:
1✔
139
        """Return a list of processed examples. Used as a function call in a
140
        different process. Returning a list as opposed to an iterator allows to
141
        do all work in another process and all that needs to be done is a
142
        memory copy between processes.
143

144
        TODO think of a way to avoid copying memory between processes.
145

146
        Args:
147

148
            shard_file (Path): Path to the shard file.
149
        """
150
        process_record = func_or_identity(self.process_record)
1✔
151

152
        return [
1✔
153
            process_record(example)
154
            for example in self.iterate_shard(shard_file)
155
        ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc