• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OSGeo / gdal / 12706066811

10 Jan 2025 08:38AM UTC coverage: 70.084% (-2.5%) from 72.549%
12706066811

Pull #11629

github

web-flow
Merge 9418dc48f into 0df468c56
Pull Request #11629: add uv documentation for python package

563296 of 803749 relevant lines covered (70.08%)

223434.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.27
/ogr/ogrsf_frmts/arrow_common/vsiarrowfilesystem.hpp
1
/******************************************************************************
2
 *
3
 * Project:  Parquet Translator
4
 * Purpose:  Implements OGRParquetDriver.
5
 * Author:   Even Rouault, <even.rouault at spatialys.com>
6
 *
7
 ******************************************************************************
8
 * Copyright (c) 2022, Planet Labs
9
 *
10
 * SPDX-License-Identifier: MIT
11
 ****************************************************************************/
12

13
#ifndef VSIARROWFILESYSTEM_HPP_INCLUDED
14
#define VSIARROWFILESYSTEM_HPP_INCLUDED
15

16
#include "arrow/util/config.h"
17

18
#include "ograrrowrandomaccessfile.h"
19

20
#include <atomic>
21
#include <memory>
22
#include <mutex>
23
#include <vector>
24
#include <utility>
25

26
/************************************************************************/
27
/*                         VSIArrowFileSystem                           */
28
/************************************************************************/
29

30
class VSIArrowFileSystem final : public arrow::fs::FileSystem
31
{
32
    const std::string m_osEnvVarPrefix;
33
    const std::string m_osQueryParameters;
34

35
    std::atomic<bool> m_bAskedToClosed = false;
36
    std::mutex m_oMutex{};
37
    std::vector<std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
38
        m_oSetFiles{};
39

40
  public:
41
    VSIArrowFileSystem(const std::string &osEnvVarPrefix,
266✔
42
                       const std::string &osQueryParameters)
43
        : m_osEnvVarPrefix(osEnvVarPrefix),
266✔
44
          m_osQueryParameters(osQueryParameters)
266✔
45
    {
46
    }
266✔
47

48
    // Cf comment in OGRParquetDataset::~OGRParquetDataset() for rationale
49
    // for this method
50
    void AskToClose()
265✔
51
    {
52
        m_bAskedToClosed = true;
265✔
53
        std::vector<
54
            std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
55
            oSetFiles;
530✔
56
        {
57
            std::lock_guard oLock(m_oMutex);
530✔
58
            oSetFiles = m_oSetFiles;
265✔
59
        }
60
        for (auto &[osName, poFile] : oSetFiles)
1,174✔
61
        {
62
            bool bWarned = false;
909✔
63
            while (!poFile.expired())
909✔
64
            {
65
                if (!bWarned)
×
66
                {
67
                    bWarned = true;
×
68
                    auto poFileLocked = poFile.lock();
×
69
                    if (poFileLocked)
×
70
                    {
71
                        CPLDebug("PARQUET",
×
72
                                 "Still on-going reads on %s. Waiting for it "
73
                                 "to be closed.",
74
                                 osName.c_str());
75
                        poFileLocked->AskToClose();
×
76
                    }
77
                }
78
                CPLSleep(0.01);
×
79
            }
80
        }
81
    }
265✔
82

83
    std::string type_name() const override
×
84
    {
85
        return "vsi" + m_osEnvVarPrefix;
×
86
    }
87

88
    using arrow::fs::FileSystem::Equals;
89

90
    bool Equals(const arrow::fs::FileSystem &other) const override
×
91
    {
92
        const auto poOther = dynamic_cast<const VSIArrowFileSystem *>(&other);
×
93
        return poOther != nullptr &&
×
94
               poOther->m_osEnvVarPrefix == m_osEnvVarPrefix &&
×
95
               poOther->m_osQueryParameters == m_osQueryParameters;
×
96
    }
97

98
    using arrow::fs::FileSystem::GetFileInfo;
99

100
    arrow::Result<arrow::fs::FileInfo>
101
    GetFileInfo(const std::string &path) override
255✔
102
    {
103
        auto fileType = arrow::fs::FileType::Unknown;
255✔
104
        VSIStatBufL sStat;
105
        if (VSIStatL(path.c_str(), &sStat) == 0)
255✔
106
        {
107
            if (VSI_ISREG(sStat.st_mode))
255✔
108
                fileType = arrow::fs::FileType::File;
252✔
109
            else if (VSI_ISDIR(sStat.st_mode))
3✔
110
                fileType = arrow::fs::FileType::Directory;
3✔
111
        }
112
        else
113
        {
114
            fileType = arrow::fs::FileType::NotFound;
×
115
        }
116
        arrow::fs::FileInfo info(path, fileType);
510✔
117
        if (fileType == arrow::fs::FileType::File)
255✔
118
            info.set_size(sStat.st_size);
252✔
119
        return info;
510✔
120
    }
121

122
    arrow::Result<arrow::fs::FileInfoVector>
123
    GetFileInfo(const arrow::fs::FileSelector &select) override
3✔
124
    {
125
        arrow::fs::FileInfoVector res;
6✔
126
        VSIDIR *psDir = VSIOpenDir(select.base_dir.c_str(),
3✔
127
                                   select.recursive ? -1 : 0, nullptr);
3✔
128
        if (psDir == nullptr)
3✔
129
            return res;
×
130

131
        bool bParquetFound = false;
3✔
132
        const int nMaxNonParquetFiles = atoi(
3✔
133
            CPLGetConfigOption("OGR_PARQUET_MAX_NON_PARQUET_FILES", "100"));
134
        const int nMaxListedFiles =
135
            atoi(CPLGetConfigOption("OGR_PARQUET_MAX_LISTED_FILES", "1000000"));
3✔
136
        while (const auto psEntry = VSIGetNextDirEntry(psDir))
10✔
137
        {
138
            if (!bParquetFound)
7✔
139
                bParquetFound =
3✔
140
                    EQUAL(CPLGetExtension(psEntry->pszName), "parquet");
3✔
141

142
            const std::string osFilename =
143
                select.base_dir + '/' + psEntry->pszName;
7✔
144
            int nMode = psEntry->nMode;
7✔
145
            if (!psEntry->bModeKnown)
7✔
146
            {
147
                VSIStatBufL sStat;
148
                if (VSIStatL(osFilename.c_str(), &sStat) == 0)
×
149
                    nMode = sStat.st_mode;
×
150
            }
151

152
            auto fileType = arrow::fs::FileType::Unknown;
7✔
153
            if (VSI_ISREG(nMode))
7✔
154
                fileType = arrow::fs::FileType::File;
7✔
155
            else if (VSI_ISDIR(nMode))
×
156
                fileType = arrow::fs::FileType::Directory;
×
157

158
            arrow::fs::FileInfo info(osFilename, fileType);
7✔
159
            if (fileType == arrow::fs::FileType::File && psEntry->bSizeKnown)
7✔
160
            {
161
                info.set_size(psEntry->nSize);
7✔
162
            }
163
            res.push_back(info);
7✔
164

165
            if (m_osEnvVarPrefix == "PARQUET")
7✔
166
            {
167
                // Avoid iterating over too many files if there's no likely parquet
168
                // files.
169
                if (static_cast<int>(res.size()) == nMaxNonParquetFiles &&
7✔
170
                    !bParquetFound)
×
171
                    break;
×
172
                if (static_cast<int>(res.size()) == nMaxListedFiles)
7✔
173
                    break;
×
174
            }
175
        }
7✔
176
        VSICloseDir(psDir);
3✔
177
        return res;
3✔
178
    }
179

180
    arrow::Status CreateDir(const std::string & /*path*/,
×
181
                            bool /*recursive*/ = true) override
182
    {
183
        return arrow::Status::IOError("CreateDir() unimplemented");
×
184
    }
185

186
    arrow::Status DeleteDir(const std::string & /*path*/) override
×
187
    {
188
        return arrow::Status::IOError("DeleteDir() unimplemented");
×
189
    }
190

191
    arrow::Status DeleteDirContents(const std::string & /*path*/
×
192
#if ARROW_VERSION_MAJOR >= 8
193
                                    ,
194
                                    bool /*missing_dir_ok*/ = false
195
#endif
196
                                    ) override
197
    {
198
        return arrow::Status::IOError("DeleteDirContents() unimplemented");
×
199
    }
200

201
    arrow::Status DeleteRootDirContents() override
×
202
    {
203
        return arrow::Status::IOError("DeleteRootDirContents() unimplemented");
×
204
    }
205

206
    arrow::Status DeleteFile(const std::string & /*path*/) override
×
207
    {
208
        return arrow::Status::IOError("DeleteFile() unimplemented");
×
209
    }
210

211
    arrow::Status Move(const std::string & /*src*/,
×
212
                       const std::string & /*dest*/) override
213
    {
214
        return arrow::Status::IOError("Move() unimplemented");
×
215
    }
216

217
    arrow::Status CopyFile(const std::string & /*src*/,
×
218
                           const std::string & /*dest*/) override
219
    {
220
        return arrow::Status::IOError("CopyFile() unimplemented");
×
221
    }
222

223
    using arrow::fs::FileSystem::OpenInputStream;
224

225
    arrow::Result<std::shared_ptr<arrow::io::InputStream>>
226
    OpenInputStream(const std::string &path) override
×
227
    {
228
        return OpenInputFile(path);
×
229
    }
230

231
    using arrow::fs::FileSystem::OpenInputFile;
232

233
    arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>>
234
    OpenInputFile(const std::string &path) override
910✔
235
    {
236
        if (m_bAskedToClosed)
910✔
237
            return arrow::Status::IOError(
×
238
                "OpenInputFile(): file system in shutdown");
×
239

240
        std::string osPath(path);
1,820✔
241
        osPath += m_osQueryParameters;
910✔
242
        CPLDebugOnly(m_osEnvVarPrefix.c_str(), "Opening %s", osPath.c_str());
910✔
243
        auto fp = VSIVirtualHandleUniquePtr(VSIFOpenL(osPath.c_str(), "rb"));
1,820✔
244
        if (fp == nullptr)
910✔
245
            return arrow::Status::IOError("OpenInputFile() failed for " +
×
246
                                          osPath);
×
247
        auto poFile =
248
            std::make_shared<OGRArrowRandomAccessFile>(osPath, std::move(fp));
1,820✔
249
        {
250
            std::lock_guard oLock(m_oMutex);
1,820✔
251
            m_oSetFiles.emplace_back(path, poFile);
910✔
252
        }
253
        return poFile;
910✔
254
    }
255

256
    using arrow::fs::FileSystem::OpenOutputStream;
257

258
    arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
259
    OpenOutputStream(const std::string & /*path*/,
×
260
                     const std::shared_ptr<const arrow::KeyValueMetadata>
261
                         & /* metadata */) override
262
    {
263
        return arrow::Status::IOError("OpenOutputStream() unimplemented");
×
264
    }
265

266
    arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
267
    OpenAppendStream(const std::string & /*path*/,
×
268
                     const std::shared_ptr<const arrow::KeyValueMetadata>
269
                         & /* metadata */) override
270
    {
271
        return arrow::Status::IOError("OpenAppendStream() unimplemented");
×
272
    }
273
};
274

275
#endif  // VSIARROWFILESYSTEM_HPP_INCLUDED
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc