• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OSGeo / gdal / 15899174551

26 Jun 2025 10:15AM UTC coverage: 71.083% (-0.001%) from 71.084%
15899174551

push

github

web-flow
Merge pull request #12647 from rouault/parquet_compression_level

Parquet: add a COMPRESSION_LEVEL layer creation option

32 of 32 new or added lines in 2 files covered. (100.0%)

88 existing lines in 42 files now uncovered.

573828 of 807268 relevant lines covered (71.08%)

250001.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.88
/ogr/ogrsf_frmts/arrow_common/vsiarrowfilesystem.hpp
1
/******************************************************************************
2
 *
3
 * Project:  Parquet Translator
4
 * Purpose:  Implements OGRParquetDriver.
5
 * Author:   Even Rouault, <even.rouault at spatialys.com>
6
 *
7
 ******************************************************************************
8
 * Copyright (c) 2022, Planet Labs
9
 *
10
 * SPDX-License-Identifier: MIT
11
 ****************************************************************************/
12

13
#ifndef VSIARROWFILESYSTEM_HPP_INCLUDED
14
#define VSIARROWFILESYSTEM_HPP_INCLUDED
15

16
#include "arrow/util/config.h"
17

18
#include "ograrrowrandomaccessfile.h"
19

20
#include <atomic>
21
#include <memory>
22
#include <mutex>
23
#include <vector>
24
#include <utility>
25

26
#if defined(__clang__)
27
#pragma clang diagnostic push
28
#pragma clang diagnostic ignored "-Wweak-vtables"
29
#endif
30

31
/************************************************************************/
32
/*                         VSIArrowFileSystem                           */
33
/************************************************************************/
34

35
class VSIArrowFileSystem final : public arrow::fs::FileSystem
36
{
37
    const std::string m_osEnvVarPrefix;
38
    const std::string m_osQueryParameters;
39

40
    std::atomic<bool> m_bAskedToClosed = false;
41
    std::mutex m_oMutex{};
42
    std::vector<std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
43
        m_oSetFiles{};
44

45
  public:
46
    VSIArrowFileSystem(const std::string &osEnvVarPrefix,
266✔
47
                       const std::string &osQueryParameters)
48
        : m_osEnvVarPrefix(osEnvVarPrefix),
266✔
49
          m_osQueryParameters(osQueryParameters)
266✔
50
    {
51
    }
266✔
52

53
    // Cf comment in OGRParquetDataset::~OGRParquetDataset() for rationale
54
    // for this method
55
    void AskToClose()
265✔
56
    {
57
        m_bAskedToClosed = true;
265✔
58
        std::vector<
59
            std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
60
            oSetFiles;
530✔
61
        {
62
            std::lock_guard oLock(m_oMutex);
530✔
63
            oSetFiles = m_oSetFiles;
265✔
64
        }
65
        for (auto &[osName, poFile] : oSetFiles)
1,174✔
66
        {
67
            bool bWarned = false;
909✔
68
            while (!poFile.expired())
909✔
69
            {
UNCOV
70
                if (!bWarned)
×
71
                {
UNCOV
72
                    bWarned = true;
×
UNCOV
73
                    auto poFileLocked = poFile.lock();
×
UNCOV
74
                    if (poFileLocked)
×
75
                    {
UNCOV
76
                        CPLDebug("PARQUET",
×
77
                                 "Still on-going reads on %s. Waiting for it "
78
                                 "to be closed.",
79
                                 osName.c_str());
UNCOV
80
                        poFileLocked->AskToClose();
×
81
                    }
82
                }
UNCOV
83
                CPLSleep(0.01);
×
84
            }
85
        }
86
    }
265✔
87

88
    std::string type_name() const override
×
89
    {
90
        return "vsi" + m_osEnvVarPrefix;
×
91
    }
92

93
    using arrow::fs::FileSystem::Equals;
94

95
    bool Equals(const arrow::fs::FileSystem &other) const override
×
96
    {
97
        const auto poOther = dynamic_cast<const VSIArrowFileSystem *>(&other);
×
98
        return poOther != nullptr &&
×
99
               poOther->m_osEnvVarPrefix == m_osEnvVarPrefix &&
×
100
               poOther->m_osQueryParameters == m_osQueryParameters;
×
101
    }
102

103
    using arrow::fs::FileSystem::GetFileInfo;
104

105
    arrow::Result<arrow::fs::FileInfo>
106
    GetFileInfo(const std::string &path) override
255✔
107
    {
108
        auto fileType = arrow::fs::FileType::Unknown;
255✔
109
        VSIStatBufL sStat;
110
        if (VSIStatL(path.c_str(), &sStat) == 0)
255✔
111
        {
112
            if (VSI_ISREG(sStat.st_mode))
255✔
113
                fileType = arrow::fs::FileType::File;
252✔
114
            else if (VSI_ISDIR(sStat.st_mode))
3✔
115
                fileType = arrow::fs::FileType::Directory;
3✔
116
        }
117
        else
118
        {
119
            fileType = arrow::fs::FileType::NotFound;
×
120
        }
121
        arrow::fs::FileInfo info(path, fileType);
510✔
122
        if (fileType == arrow::fs::FileType::File)
255✔
123
            info.set_size(sStat.st_size);
252✔
124
        return info;
510✔
125
    }
126

127
    arrow::Result<arrow::fs::FileInfoVector>
128
    GetFileInfo(const arrow::fs::FileSelector &select) override
3✔
129
    {
130
        arrow::fs::FileInfoVector res;
6✔
131
        VSIDIR *psDir = VSIOpenDir(select.base_dir.c_str(),
3✔
132
                                   select.recursive ? -1 : 0, nullptr);
3✔
133
        if (psDir == nullptr)
3✔
134
            return res;
×
135

136
        bool bParquetFound = false;
3✔
137
        const int nMaxNonParquetFiles = atoi(
3✔
138
            CPLGetConfigOption("OGR_PARQUET_MAX_NON_PARQUET_FILES", "100"));
139
        const int nMaxListedFiles =
140
            atoi(CPLGetConfigOption("OGR_PARQUET_MAX_LISTED_FILES", "1000000"));
3✔
141
        while (const auto psEntry = VSIGetNextDirEntry(psDir))
10✔
142
        {
143
            if (!bParquetFound)
7✔
144
                bParquetFound = EQUAL(
3✔
145
                    CPLGetExtensionSafe(psEntry->pszName).c_str(), "parquet");
146

147
            std::string osFilename = select.base_dir + '/' + psEntry->pszName;
7✔
148
            int nMode = psEntry->nMode;
7✔
149
            if (!psEntry->bModeKnown)
7✔
150
            {
151
                VSIStatBufL sStat;
152
                if (VSIStatL(osFilename.c_str(), &sStat) == 0)
×
153
                    nMode = sStat.st_mode;
×
154
            }
155

156
            auto fileType = arrow::fs::FileType::Unknown;
7✔
157
            if (VSI_ISREG(nMode))
7✔
158
                fileType = arrow::fs::FileType::File;
7✔
159
            else if (VSI_ISDIR(nMode))
×
160
                fileType = arrow::fs::FileType::Directory;
×
161

162
            arrow::fs::FileInfo info(std::move(osFilename), fileType);
7✔
163
            if (fileType == arrow::fs::FileType::File && psEntry->bSizeKnown)
7✔
164
            {
165
                info.set_size(psEntry->nSize);
7✔
166
            }
167
            res.push_back(std::move(info));
7✔
168

169
            if (m_osEnvVarPrefix == "PARQUET")
7✔
170
            {
171
                // Avoid iterating over too many files if there's no likely parquet
172
                // files.
173
                if (static_cast<int>(res.size()) == nMaxNonParquetFiles &&
7✔
174
                    !bParquetFound)
×
175
                    break;
×
176
                if (static_cast<int>(res.size()) == nMaxListedFiles)
7✔
177
                    break;
×
178
            }
179
        }
7✔
180
        VSICloseDir(psDir);
3✔
181
        return res;
3✔
182
    }
183

184
    arrow::Status CreateDir(const std::string & /*path*/,
×
185
                            bool /*recursive*/ = true) override
186
    {
187
        return arrow::Status::IOError("CreateDir() unimplemented");
×
188
    }
189

190
    arrow::Status DeleteDir(const std::string & /*path*/) override
×
191
    {
192
        return arrow::Status::IOError("DeleteDir() unimplemented");
×
193
    }
194

195
    arrow::Status DeleteDirContents(const std::string & /*path*/
×
196
#if ARROW_VERSION_MAJOR >= 8
197
                                    ,
198
                                    bool /*missing_dir_ok*/ = false
199
#endif
200
                                    ) override
201
    {
202
        return arrow::Status::IOError("DeleteDirContents() unimplemented");
×
203
    }
204

205
    arrow::Status DeleteRootDirContents() override
×
206
    {
207
        return arrow::Status::IOError("DeleteRootDirContents() unimplemented");
×
208
    }
209

210
    arrow::Status DeleteFile(const std::string & /*path*/) override
×
211
    {
212
        return arrow::Status::IOError("DeleteFile() unimplemented");
×
213
    }
214

215
    arrow::Status Move(const std::string & /*src*/,
×
216
                       const std::string & /*dest*/) override
217
    {
218
        return arrow::Status::IOError("Move() unimplemented");
×
219
    }
220

221
    arrow::Status CopyFile(const std::string & /*src*/,
×
222
                           const std::string & /*dest*/) override
223
    {
224
        return arrow::Status::IOError("CopyFile() unimplemented");
×
225
    }
226

227
    using arrow::fs::FileSystem::OpenInputStream;
228

229
    arrow::Result<std::shared_ptr<arrow::io::InputStream>>
230
    OpenInputStream(const std::string &path) override
×
231
    {
232
        return OpenInputFile(path);
×
233
    }
234

235
    using arrow::fs::FileSystem::OpenInputFile;
236

237
    arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>>
238
    OpenInputFile(const std::string &path) override
910✔
239
    {
240
        if (m_bAskedToClosed)
910✔
241
            return arrow::Status::IOError(
×
242
                "OpenInputFile(): file system in shutdown");
×
243

244
        std::string osPath(path);
1,820✔
245
        osPath += m_osQueryParameters;
910✔
246
        CPLDebugOnly(m_osEnvVarPrefix.c_str(), "Opening %s", osPath.c_str());
910✔
247
        auto fp = VSIVirtualHandleUniquePtr(VSIFOpenL(osPath.c_str(), "rb"));
1,820✔
248
        if (fp == nullptr)
910✔
249
            return arrow::Status::IOError("OpenInputFile() failed for " +
×
250
                                          osPath);
×
251
        auto poFile =
252
            std::make_shared<OGRArrowRandomAccessFile>(osPath, std::move(fp));
1,820✔
253
        {
254
            std::lock_guard oLock(m_oMutex);
1,820✔
255
            m_oSetFiles.emplace_back(path, poFile);
910✔
256
        }
257
        return poFile;
910✔
258
    }
259

260
    using arrow::fs::FileSystem::OpenOutputStream;
261

262
    arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
263
    OpenOutputStream(const std::string & /*path*/,
×
264
                     const std::shared_ptr<const arrow::KeyValueMetadata>
265
                         & /* metadata */) override
266
    {
267
        return arrow::Status::IOError("OpenOutputStream() unimplemented");
×
268
    }
269

270
    arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
271
    OpenAppendStream(const std::string & /*path*/,
×
272
                     const std::shared_ptr<const arrow::KeyValueMetadata>
273
                         & /* metadata */) override
274
    {
275
        return arrow::Status::IOError("OpenAppendStream() unimplemented");
×
276
    }
277
};
278

279
#if defined(__clang__)
280
#pragma clang diagnostic pop
281
#endif
282

283
#endif  // VSIARROWFILESYSTEM_HPP_INCLUDED
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc