• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

weimin96 / oss-spring-boot-starter / #7

09 Apr 2026 08:26AM UTC coverage: 44.12% (+0.05%) from 44.066%
#7

push

github

panweimin
feat:分片上传优化

3 of 4 new or added lines in 1 file covered. (75.0%)

559 of 1267 relevant lines covered (44.12%)

0.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.68
/oss-spring-boot3-starter/src/main/java/com/wiblog/oss/service/PutOperations.java
1
package com.wiblog.oss.service;
2

3
import com.wiblog.oss.bean.ObjectInfo;
4
import com.wiblog.oss.bean.OssProperties;
5
import com.wiblog.oss.bean.chunk.Chunk;
6
import com.wiblog.oss.bean.chunk.ChunkMerge;
7
import com.wiblog.oss.bean.chunk.ChunkTarget;
8
import com.wiblog.oss.bean.chunk.ChunkTask;
9
import com.wiblog.oss.exception.OssException;
10
import com.wiblog.oss.util.Util;
11
import lombok.extern.slf4j.Slf4j;
12
import software.amazon.awssdk.core.async.AsyncRequestBody;
13
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
14
import software.amazon.awssdk.services.s3.S3AsyncClient;
15
import software.amazon.awssdk.services.s3.model.*;
16
import software.amazon.awssdk.transfer.s3.S3TransferManager;
17
import software.amazon.awssdk.transfer.s3.model.Upload;
18
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;
19
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
20
import software.amazon.awssdk.transfer.s3.model.UploadRequest;
21

22
import java.io.*;
23
import java.nio.ByteBuffer;
24
import java.nio.file.Paths;
25
import java.util.Comparator;
26
import java.util.Date;
27
import java.util.List;
28
import java.util.stream.Collectors;
29

30
/**
31
 * 上传操作
32
 *
33
 * @author panwm
34
 */
35
@Slf4j
1✔
36
public class PutOperations extends Operations {
37

38
    public PutOperations(OssProperties ossProperties, S3AsyncClient client, S3TransferManager transferManager) {
39
        super(ossProperties, client, transferManager);
1✔
40
    }
1✔
41

42
    // ----------------------------------------------------------------
43
    // Bucket 操作
44
    // ----------------------------------------------------------------
45

46
    /**
47
     * 创建 bucket(若已存在则跳过)
48
     */
49
    public void createBucket(String bucketName) {
50
        if (!bucketExists(bucketName)) {
×
51
            CreateBucketRequest req = CreateBucketRequest.builder().bucket(bucketName).build();
×
52
            handleRequest(() -> client.createBucket(req));
×
53
            log.info("Bucket [{}] created", bucketName);
×
54
        }
55
    }
×
56

57
    private boolean bucketExists(String bucketName) {
58
        try {
59
            client.headBucket(HeadBucketRequest.builder().bucket(bucketName).build()).join();
×
60
            return true;
×
61
        } catch (Exception e) {
×
62
            return false;
×
63
        }
64
    }
65

66
    // ----------------------------------------------------------------
67
    // 文件上传 - InputStream
68
    // ----------------------------------------------------------------
69

70
    public ObjectInfo putObject(String path, String filename, InputStream in) {
71
        return putObject(ossProperties.getBucketName(), path, filename, in);
1✔
72
    }
73

74
    public ObjectInfo putObject(String bucketName, String path, String filename, InputStream in) {
75
        return putObjectForKey(bucketName, formatPath(path) + filename, in);
1✔
76
    }
77

78
    public ObjectInfo putObjectForKey(String objectName, InputStream stream) {
79
        return putObjectForKey(ossProperties.getBucketName(), objectName, stream);
1✔
80
    }
81

82
    /**
83
     * 上传 InputStream。
84
     * <p>
85
     * 改进:原代码使用 stream.available() 获取大小(不可靠),
86
     * 现改为先将流读入缓冲区,用精确字节数上传,确保 Content-Length 正确。
87
     */
88
    public ObjectInfo putObjectForKey(String bucketName, String objectName, InputStream stream) {
89
        objectName = formatPath(objectName);
1✔
90
        // 先缓冲,获得精确长度
91
        byte[] data = toByteArray(stream);
1✔
92
        long fileSize = data.length;
1✔
93

94
        BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream(fileSize);
1✔
95
        PutObjectRequest putReq = PutObjectRequest.builder()
1✔
96
                .bucket(bucketName).key(objectName)
1✔
97
                .contentType(Util.getContentType(objectName))
1✔
98
                .build();
1✔
99
        UploadRequest uploadReq = UploadRequest.builder()
1✔
100
                .requestBody(body).putObjectRequest(putReq).build();
1✔
101

102
        Upload upload = transferManager.upload(uploadReq);
1✔
103
        body.writeInputStream(new ByteArrayInputStream(data));
1✔
104
        upload.completionFuture().join();
1✔
105

106
        return buildObjectInfo(objectName, new Date(), fileSize);
1✔
107
    }
108

109
    // ----------------------------------------------------------------
110
    // 文件上传 - File
111
    // ----------------------------------------------------------------
112

113
    public ObjectInfo putObject(String path, String filename, File file) {
114
        return putObject(ossProperties.getBucketName(), path, filename, file);
×
115
    }
116

117
    public ObjectInfo putObject(String bucketName, String path, String filename, File file) {
118
        return putObjectForKey(bucketName, formatPath(path) + filename, file);
×
119
    }
120

121
    public ObjectInfo putObjectForKey(String objectName, File file) {
122
        return putObjectForKey(ossProperties.getBucketName(), objectName, file);
×
123
    }
124

125
    public ObjectInfo putObjectForKey(String bucketName, String objectName, File file) {
126
        objectName = formatPath(objectName);
×
127
        PutObjectRequest putReq = PutObjectRequest.builder()
×
128
                .bucket(bucketName).key(objectName)
×
129
                .contentType(Util.getContentType(objectName))
×
130
                .build();
×
131
        UploadFileRequest uploadFileReq = UploadFileRequest.builder()
×
132
                .putObjectRequest(putReq).source(file).build();
×
133
        transferManager.uploadFile(uploadFileReq).completionFuture().join();
×
134
        return buildObjectInfo(objectName, new Date(), file.length());
×
135
    }
136

137
    // ----------------------------------------------------------------
138
    // 目录操作
139
    // ----------------------------------------------------------------
140

141
    public ObjectInfo mkdirs(String path) {
142
        return mkdirs(ossProperties.getBucketName(), path);
×
143
    }
144

145
    public ObjectInfo mkdirs(String bucketName, String path) {
146
        PutObjectRequest req = PutObjectRequest.builder()
×
147
                .bucket(bucketName).key(formatPath(path)).build();
×
148
        handleRequest(() -> client.putObject(req, AsyncRequestBody.empty()));
×
149
        return buildObjectInfo(path, new Date(), 0);
×
150
    }
151

152
    public void putFolder(String path, File folder) {
153
        putFolder(path, folder, true);
×
154
    }
×
155

156
    public void putFolder(String path, File folder, boolean isIncludeFolderName) {
157
        putFolder(ossProperties.getBucketName(), path, folder, isIncludeFolderName);
×
158
    }
×
159

160
    public void putFolder(String bucketName, String path, File folder, boolean isIncludeFolderName) {
161
        if (!folder.exists() || !folder.isDirectory()) {
×
162
            throw new IllegalArgumentException("目录不存在: " + folder.getPath());
×
163
        }
164
        path = formatPath(path);
×
165
        if (isIncludeFolderName) {
×
166
            path += folder.getName() + "/";
×
167
        }
168
        UploadDirectoryRequest req = UploadDirectoryRequest.builder()
×
169
                .source(Paths.get(folder.getAbsolutePath()))
×
170
                .s3Prefix(path).bucket(bucketName).build();
×
171
        transferManager.uploadDirectory(req).completionFuture().join();
×
172
    }
×
173

174
    // ----------------------------------------------------------------
175
    // 拷贝 / 移动
176
    // ----------------------------------------------------------------
177

178
    public void copyFile(String sourceKey, String destKey) {
179
        copyFile(ossProperties.getBucketName(), ossProperties.getBucketName(), sourceKey, destKey);
1✔
180
    }
1✔
181

182
    public void copyFile(String sourceBucket, String destBucket, String sourceKey, String destKey) {
183
        CopyObjectRequest req = CopyObjectRequest.builder()
1✔
184
                .sourceBucket(sourceBucket).sourceKey(formatPath(sourceKey))
1✔
185
                .destinationBucket(destBucket).destinationKey(formatPath(destKey))
1✔
186
                .build();
1✔
187
        handleRequest(() -> client.copyObject(req));
1✔
188
    }
1✔
189

190
    public void move(String sourceObjectName, String destinationDirectory) {
191
        move(ossProperties.getBucketName(), sourceObjectName, destinationDirectory);
1✔
192
    }
1✔
193

194
    public void move(String bucketName, String sourceObjectName, String destinationDirectory) {
195
        String filename = Util.getFilename(sourceObjectName);
1✔
196
        String destKey = Util.formatPath(destinationDirectory) + filename;
1✔
197
        copyFile(bucketName, bucketName, sourceObjectName, destKey);
1✔
198
        handleRequest(() -> client.deleteObject(x -> x.bucket(bucketName).key(formatPath(sourceObjectName)).build()));
1✔
199
    }
1✔
200

201
    // ----------------------------------------------------------------
202
    // 分片上传
203
    // ----------------------------------------------------------------
204

205
    public String initTask(ChunkTask chunkTask) {
206
        String objectName = formatPath(chunkTask.getPath()) + chunkTask.getFilename();
1✔
207
        CreateMultipartUploadResponse resp = client.createMultipartUpload(b -> b
1✔
208
                .bucket(ossProperties.getBucketName()).key(objectName)).join();
1✔
209
        return resp.uploadId();
1✔
210
    }
211

212
    public ChunkTarget chunk(Chunk chunk) {
213
        UploadPartRequest req = UploadPartRequest.builder()
1✔
214
                .bucket(ossProperties.getBucketName())
1✔
215
                .key(formatPath(chunk.getPath()) + chunk.getFilename())
1✔
216
                .uploadId(chunk.getUploadId())
1✔
217
                .partNumber(chunk.getChunkNumber())
1✔
218
                .contentLength(chunk.getFile().getSize())
1✔
219
                .build();
1✔
220
        try {
221
            ByteBuffer buf = ByteBuffer.wrap(chunk.getFile().getBytes());
1✔
222
            String etag = client.uploadPart(req, AsyncRequestBody.fromByteBuffer(buf)).join().eTag();
1✔
223
            ChunkTarget target = new ChunkTarget();
1✔
224
            target.setEtag(etag.replace("\"", ""));
1✔
225
            target.setPartNumber(chunk.getChunkNumber());
1✔
226
            return target;
1✔
227
        } catch (Exception e) {
×
228
            log.error("分片上传失败 file={} part={}", chunk.getFilename(), chunk.getChunkNumber(), e);
×
229
            throw OssException.uploadFailed(chunk.getFilename(), e);
×
230
        }
231
    }
232

233
    public ObjectInfo merge(ChunkMerge chunkMerge) {
234
        String objectName = formatPath(chunkMerge.getPath()) + chunkMerge.getFilename();
1✔
235
        // 处理 null 或空列表的情况
236
        List<ChunkTarget> chunkList = chunkMerge.getChunkTargetList();
1✔
237
        if (chunkList == null || chunkList.isEmpty()) {
1✔
NEW
238
            throw new IllegalArgumentException("分片列表不能为空,请确保所有分片已上传完成");
×
239
        }
240
        List<CompletedPart> parts = chunkList.stream()
1✔
241
                .map(p -> CompletedPart.builder().partNumber(p.getPartNumber()).eTag(p.getEtag()).build())
1✔
242
                .sorted(Comparator.comparingInt(CompletedPart::partNumber))
1✔
243
                .collect(Collectors.toList());
1✔
244

245
        client.completeMultipartUpload(b -> b
1✔
246
                .bucket(ossProperties.getBucketName()).key(objectName)
1✔
247
                .uploadId(chunkMerge.getUploadId())
1✔
248
                .multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())).join();
1✔
249

250
        return ObjectInfo.builder()
1✔
251
                .uri(objectName).url(getDomain() + objectName)
1✔
252
                .name(Util.getFilename(objectName)).build();
1✔
253
    }
254

255
    public List<Part> listParts(String bucketName, String objectName, String uploadId) {
256
        ListPartsRequest req = ListPartsRequest.builder()
×
257
                .bucket(bucketName).key(objectName).uploadId(uploadId)
×
258
                .maxParts(Integer.MAX_VALUE).build();
×
259
        return client.listParts(req).join().parts();
×
260
    }
261

262
    // ----------------------------------------------------------------
263
    // 私有工具
264
    // ----------------------------------------------------------------
265

266
    /**
267
     * 将 InputStream 读入字节数组。
268
     * 改进:原代码使用 available()(不可靠),此处使用 ByteArrayOutputStream 完整读取。
269
     */
270
    private static byte[] toByteArray(InputStream stream) {
271
        try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
1✔
272
            byte[] buf = new byte[8192];
1✔
273
            int read;
274
            while ((read = stream.read(buf)) != -1) {
1✔
275
                buffer.write(buf, 0, read);
1✔
276
            }
277
            return buffer.toByteArray();
1✔
278
        } catch (IOException e) {
×
279
            throw new OssException("STREAM_READ_ERROR", "Failed to read input stream", e);
×
280
        }
281
    }
282
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc