• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

weimin96 / oss-spring-boot-starter / #5

06 Apr 2026 02:14PM UTC coverage: 42.506% (-21.1%) from 63.625%
#5

push

github

web-flow
Merge pull request #9 from weimin96/spring3

Spring3

52 of 509 new or added lines in 8 files covered. (10.22%)

536 of 1261 relevant lines covered (42.51%)

0.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.5
/oss-spring-boot3-starter/src/main/java/com/wiblog/oss/service/StreamUnzipOperations.java
1
package com.wiblog.oss.service;
2

3
import com.wiblog.oss.bean.ObjectInfo;
4
import com.wiblog.oss.bean.OssProperties;
5
import com.wiblog.oss.bean.UnzipResult;
6
import com.wiblog.oss.util.Util;
7
import lombok.extern.slf4j.Slf4j;
8
import software.amazon.awssdk.core.async.AsyncRequestBody;
9
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
10
import software.amazon.awssdk.services.s3.S3AsyncClient;
11
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
12
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
13
import software.amazon.awssdk.transfer.s3.S3TransferManager;
14

15
import java.io.*;
16
import java.nio.ByteBuffer;
17
import java.util.ArrayList;
18
import java.util.Date;
19
import java.util.List;
20
import java.util.zip.ZipEntry;
21
import java.util.zip.ZipInputStream;
22

23
/**
24
 * 流式解压操作类。
25
 *
26
 * 通过 {@link ZipInputStream} 边下载边解压,避免将整个 ZIP 文件先下载到本地再解压,
27
 * 大幅降低内存占用和磁盘 I/O。每个 ZIP 条目解压后立即以多部分上传写回 S3 目标路径。
28
 *
29
 * <h3>工作流程</h3>
30
 * <pre>
31
 *   S3 ZIP 对象
32
 *       ↓  GetObject → InputStream
33
 *   ZipInputStream
34
 *       ↓  nextEntry() 循环
35
 *   [每个 ZipEntry]
36
 *       ↓  读取内容 → 写入 PipedOutputStream
37
 *   PutObject(目标路径)
38
 * </pre>
39
 *
40
 * <h3>内存策略</h3>
41
 * <ul>
42
 *   <li>对于已知大小的条目({@code entry.getSize() >= 0}),使用精确缓冲区直传。</li>
43
 *   <li>对于未知大小的条目(压缩包内再嵌套 ZIP 等极端情况),先写入 ByteArrayOutputStream
44
 *       缓冲再上传,以换取稳定性。</li>
45
 * </ul>
46
 *
47
 * @author panwm
48
 */
49
@Slf4j
1✔
50
public class StreamUnzipOperations extends Operations {
51

52
    /**
53
     * 流式读取时的 IO 缓冲区(64KB),平衡内存与吞吐。
54
     */
55
    private static final int BUFFER_SIZE = 64 * 1024;
56

57
    /**
58
     * 单个条目内存缓冲上限(128MB)。超过此大小的未知长度条目将写临时文件。
59
     */
60
    private static final int MAX_BUFFER_BYTES = 128 * 1024 * 1024;
61

62
    public StreamUnzipOperations(OssProperties ossProperties, S3AsyncClient client,
63
                                 S3TransferManager transferManager) {
64
        super(ossProperties, client, transferManager);
1✔
65
    }
1✔
66

67
    // ----------------------------------------------------------------
68
    // 公开 API
69
    // ----------------------------------------------------------------
70

71
    /**
72
     * 将 S3 中的 ZIP 对象流式解压到同一 Bucket 的目标路径。
73
     *
74
     * @param zipObjectKey ZIP 文件在 S3 中的 key
75
     * @param targetPath   解压后文件的目标路径前缀(结尾无需加 "/")
76
     * @return 解压结果,包含成功/失败列表
77
     */
78
    public UnzipResult unzip(String zipObjectKey, String targetPath) {
NEW
79
        return unzip(ossProperties.getBucketName(), zipObjectKey, ossProperties.getBucketName(), targetPath);
×
80
    }
81

82
    /**
83
     * 将源 Bucket 的 ZIP 对象流式解压到目标 Bucket 的指定路径。
84
     *
85
     * @param sourceBucket 源 Bucket
86
     * @param zipObjectKey ZIP 文件 key
87
     * @param targetBucket 目标 Bucket
88
     * @param targetPath   目标路径前缀
89
     * @return 解压结果
90
     */
91
    public UnzipResult unzip(String sourceBucket, String zipObjectKey,
92
                             String targetBucket, String targetPath) {
NEW
93
        String normalizedTargetPath = Util.formatPath(targetPath);
×
NEW
94
        log.info("Stream unzip: [{}/{}] → [{}/{}]",
×
95
                sourceBucket, zipObjectKey, targetBucket, normalizedTargetPath);
96

NEW
97
        List<ObjectInfo> succeeded = new ArrayList<>();
×
NEW
98
        List<String> failed = new ArrayList<>();
×
99

NEW
100
        try (InputStream s3Stream = fetchInputStream(sourceBucket, zipObjectKey);
×
NEW
101
             ZipInputStream zis = new ZipInputStream(new BufferedInputStream(s3Stream, BUFFER_SIZE))) {
×
102

103
            ZipEntry entry;
NEW
104
            while ((entry = zis.getNextEntry()) != null) {
×
NEW
105
                if (entry.isDirectory()) {
×
NEW
106
                    zis.closeEntry();
×
NEW
107
                    continue;
×
108
                }
NEW
109
                String entryName = entry.getName();
×
NEW
110
                String destKey = normalizedTargetPath + entryName;
×
111
                try {
NEW
112
                    ObjectInfo info = uploadEntry(zis, entry, targetBucket, destKey);
×
NEW
113
                    succeeded.add(info);
×
NEW
114
                    log.debug("Unzipped entry [{}] → [{}]", entryName, destKey);
×
NEW
115
                } catch (Exception ex) {
×
NEW
116
                    log.warn("Failed to unzip entry [{}]: {}", entryName, ex.getMessage(), ex);
×
NEW
117
                    failed.add(entryName);
×
118
                } finally {
NEW
119
                    zis.closeEntry();
×
120
                }
NEW
121
            }
×
NEW
122
        } catch (IOException e) {
×
NEW
123
            log.error("Stream unzip failed for [{}]: {}", zipObjectKey, e.getMessage(), e);
×
NEW
124
            throw new com.wiblog.oss.exception.OssException("UNZIP_ERROR",
×
NEW
125
                    "Stream unzip failed: " + e.getMessage(), e);
×
NEW
126
        }
×
127

NEW
128
        log.info("Stream unzip done: {} succeeded, {} failed", succeeded.size(), failed.size());
×
NEW
129
        return UnzipResult.builder()
×
NEW
130
                .targetPath(normalizedTargetPath)
×
NEW
131
                .succeeded(succeeded)
×
NEW
132
                .failed(failed)
×
NEW
133
                .build();
×
134
    }
135

136
    /**
137
     * 将 S3 中的 ZIP 对象流式解压,每个条目由调用方通过 {@link UnzipEntryHandler} 自定义处理。
138
     *
139
     * <p>适用于解压后写入本地磁盘、转发到 HTTP 响应等自定义场景。
140
     *
141
     * @param zipObjectKey ZIP 文件 key
142
     * @param handler      自定义条目处理器
143
     * @return 解压结果
144
     */
145
    public UnzipResult unzip(String zipObjectKey, UnzipEntryHandler handler) {
NEW
146
        return unzip(ossProperties.getBucketName(), zipObjectKey, handler);
×
147
    }
148

149
    /**
150
     * 将指定 Bucket 中的 ZIP 对象流式解压,条目由 handler 自定义处理。
151
     *
152
     * @param bucketName   源 Bucket
153
     * @param zipObjectKey ZIP 文件 key
154
     * @param handler      自定义条目处理器
155
     * @return 解压结果
156
     */
157
    public UnzipResult unzip(String bucketName, String zipObjectKey, UnzipEntryHandler handler) {
NEW
158
        log.info("Stream unzip with custom handler: [{}/{}]", bucketName, zipObjectKey);
×
159

NEW
160
        List<ObjectInfo> succeeded = new ArrayList<>();
×
NEW
161
        List<String> failed = new ArrayList<>();
×
162

NEW
163
        try (InputStream s3Stream = fetchInputStream(bucketName, zipObjectKey);
×
NEW
164
             ZipInputStream zis = new ZipInputStream(new BufferedInputStream(s3Stream, BUFFER_SIZE))) {
×
165

166
            ZipEntry entry;
NEW
167
            while ((entry = zis.getNextEntry()) != null) {
×
NEW
168
                if (entry.isDirectory()) {
×
NEW
169
                    zis.closeEntry();
×
NEW
170
                    continue;
×
171
                }
NEW
172
                String entryName = entry.getName();
×
173
                try {
174
                    // 用 NonClosingInputStream 包装,防止 handler 误关闭 zis
NEW
175
                    handler.handle(entry, new NonClosingInputStream(zis));
×
NEW
176
                    succeeded.add(ObjectInfo.builder()
×
NEW
177
                            .name(Util.getFilename(entryName))
×
NEW
178
                            .uri(entryName)
×
NEW
179
                            .size(entry.getSize() < 0 ? -1 : entry.getSize())
×
NEW
180
                            .build());
×
NEW
181
                    log.debug("Handled entry [{}]", entryName);
×
NEW
182
                } catch (Exception ex) {
×
NEW
183
                    log.warn("Handler failed for entry [{}]: {}", entryName, ex.getMessage(), ex);
×
NEW
184
                    failed.add(entryName);
×
185
                } finally {
NEW
186
                    zis.closeEntry();
×
187
                }
NEW
188
            }
×
NEW
189
        } catch (IOException e) {
×
NEW
190
            log.error("Stream unzip (custom handler) failed for [{}]: {}", zipObjectKey, e.getMessage(), e);
×
NEW
191
            throw new com.wiblog.oss.exception.OssException("UNZIP_ERROR",
×
NEW
192
                    "Stream unzip failed: " + e.getMessage(), e);
×
NEW
193
        }
×
194

NEW
195
        return UnzipResult.builder()
×
NEW
196
                .succeeded(succeeded)
×
NEW
197
                .failed(failed)
×
NEW
198
                .build();
×
199
    }
200

201
    /**
202
     * 仅解压 ZIP 包中指定路径前缀的条目到目标路径。
203
     *
204
     * @param zipObjectKey ZIP 文件 key
205
     * @param entryPrefix  只解压名称以此前缀开头的条目,传 null 或 "" 表示全部
206
     * @param targetPath   目标路径前缀
207
     * @return 解压结果
208
     */
209
    public UnzipResult unzipWithFilter(String zipObjectKey, String entryPrefix, String targetPath) {
NEW
210
        return unzipWithFilter(ossProperties.getBucketName(), zipObjectKey,
×
NEW
211
                ossProperties.getBucketName(), entryPrefix, targetPath);
×
212
    }
213

214
    /**
215
     * 带前缀过滤的跨 Bucket 流式解压。
216
     */
217
    public UnzipResult unzipWithFilter(String sourceBucket, String zipObjectKey,
218
                                       String targetBucket, String entryPrefix, String targetPath) {
NEW
219
        final String prefix = (entryPrefix == null) ? "" : entryPrefix;
×
NEW
220
        return unzip(sourceBucket, zipObjectKey, (entry, stream) -> {
×
NEW
221
            if (!entry.getName().startsWith(prefix)) {
×
NEW
222
                return; // 跳过不匹配的条目
×
223
            }
NEW
224
            String relName = prefix.isEmpty() ? entry.getName()
×
NEW
225
                    : entry.getName().substring(prefix.length());
×
NEW
226
            String destKey = Util.formatPath(targetPath) + relName;
×
NEW
227
            byte[] data = stream.readAllBytes();
×
NEW
228
            uploadBytes(data, targetBucket, destKey);
×
NEW
229
        });
×
230
    }
231

232
    // ----------------------------------------------------------------
233
    // 私有实现
234
    // ----------------------------------------------------------------
235

236
    /**
237
     * 从 S3 获取对象输入流。
238
     */
239
    private InputStream fetchInputStream(String bucketName, String objectKey) {
NEW
240
        GetObjectRequest req = GetObjectRequest.builder()
×
NEW
241
                .bucket(bucketName).key(objectKey).build();
×
NEW
242
        byte[] bytes = handleRequest(() ->
×
NEW
243
                client.getObject(req, AsyncResponseTransformer.toBytes())
×
NEW
244
                        .thenApply(rb -> {
×
NEW
245
                            ByteBuffer buf = rb.asByteBuffer();
×
NEW
246
                            byte[] b = new byte[buf.remaining()];
×
NEW
247
                            buf.get(b);
×
NEW
248
                            return b;
×
249
                        }));
NEW
250
        return new ByteArrayInputStream(bytes);
×
251
    }
252

253
    /**
254
     * 将 ZIP 条目内容上传到 S3 目标 key。
255
     *
256
     * <p>优先使用条目的已知大小({@code entry.getSize()})做精确传输;
257
     * 如果为 -1(未知大小,通常是 DEFLATED 压缩方法),则先读入内存再上传。</p>
258
     */
259
    private ObjectInfo uploadEntry(ZipInputStream zis, ZipEntry entry,
260
                                   String targetBucket, String destKey) throws IOException {
NEW
261
        long knownSize = entry.getSize();
×
NEW
262
        byte[] data = readAllBytes(zis);
×
NEW
263
        uploadBytes(data, targetBucket, destKey);
×
264

NEW
265
        long actualSize = knownSize >= 0 ? knownSize : data.length;
×
NEW
266
        return ObjectInfo.builder()
×
NEW
267
                .uri(destKey)
×
NEW
268
                .url(getDomain() + destKey)
×
NEW
269
                .name(Util.getFilename(destKey))
×
NEW
270
                .size(actualSize)
×
NEW
271
                .ext(Util.getExtension(destKey))
×
NEW
272
                .uploadTime(new Date())
×
NEW
273
                .build();
×
274
    }
275

276
    /**
277
     * 将字节数组上传到 S3。
278
     */
279
    private void uploadBytes(byte[] data, String bucket, String key) {
NEW
280
        PutObjectRequest putReq = PutObjectRequest.builder()
×
NEW
281
                .bucket(bucket)
×
NEW
282
                .key(key)
×
NEW
283
                .contentType(Util.getContentType(key))
×
NEW
284
                .contentLength((long) data.length)
×
NEW
285
                .build();
×
NEW
286
        handleRequest(() -> client.putObject(putReq, AsyncRequestBody.fromBytes(data)));
×
NEW
287
        log.debug("Uploaded unzipped entry: [{}/{}] ({} bytes)", bucket, key, data.length);
×
NEW
288
    }
×
289

290
    /**
291
     * 从流中读取全部字节(不关闭流)。
292
     */
293
    private static byte[] readAllBytes(InputStream in) throws IOException {
NEW
294
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
×
NEW
295
        byte[] buf = new byte[BUFFER_SIZE];
×
296
        int read;
NEW
297
        while ((read = in.read(buf)) != -1) {
×
NEW
298
            baos.write(buf, 0, read);
×
NEW
299
            if (baos.size() > MAX_BUFFER_BYTES) {
×
NEW
300
                throw new IOException("ZIP entry exceeds max buffer size: " + MAX_BUFFER_BYTES + " bytes");
×
301
            }
302
        }
NEW
303
        return baos.toByteArray();
×
304
    }
305

306
    /**
307
     * 包装输入流,禁止调用方调用 close(),防止误关闭 ZipInputStream。
308
     */
309
    private static class NonClosingInputStream extends FilterInputStream {
310
        NonClosingInputStream(InputStream in) {
NEW
311
            super(in);
×
NEW
312
        }
×
313

314
        @Override
315
        public void close() {
316
            // 刻意不关闭,由框架统一管理
NEW
317
        }
×
318
    }
319
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc