• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3530

16 Nov 2024 07:44AM UTC coverage: 60.219% (-0.7%) from 60.888%
#3530

push

travis-ci

web-flow
Update 03-ad.md

118417 of 252124 branches covered (46.97%)

Branch coverage included in aggregate %.

198982 of 274951 relevant lines covered (72.37%)

6072359.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.91
/source/common/src/cos.c
1
#define ALLOW_FORBID_FUNC
2

3
#include "cos.h"
4
#include "cos_cp.h"
5
#include "tdef.h"
6
#include "tutil.h"
7

8
extern int8_t tsS3EpNum;
9
extern char   tsS3Endpoint[][TSDB_FQDN_LEN];
10
extern char   tsS3AccessKeyId[][TSDB_FQDN_LEN];
11
extern char   tsS3AccessKeySecret[][TSDB_FQDN_LEN];
12
extern char   tsS3BucketName[TSDB_FQDN_LEN];
13
extern char   tsS3AppId[][TSDB_FQDN_LEN];
14
extern char   tsS3Hostname[][TSDB_FQDN_LEN];
15
extern int8_t tsS3Https[];
16

17
static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex);
18
static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *object_name, int64_t offset, int64_t size,
19
                                             int8_t epIndex);
20
static int32_t s3DeleteObjectsByEp(const char *object_name[], int nobject, int8_t epIndex);
21
static SArray *getListByPrefixByEp(const char *prefix, int8_t epIndex);
22
static int32_t s3GetObjectBlockByEp(const char *object_name, int64_t offset, int64_t size, bool check,
23
                                    uint8_t **ppBlock, int8_t epIndex);
24
static int32_t s3GetObjectToFileByEp(const char *object_name, const char *fileName, int8_t epIndex);
25
static long    s3SizeByEp(const char *object_name, int8_t epIndex);
26

27
#if defined(USE_S3)
28

29
#include "libs3.h"
30
#include "tarray.h"
31

32
static int         verifyPeerG = 0;
33
static const char *awsRegionG = NULL;
34
static int         forceG = 0;
35
static int         showResponsePropertiesG = 0;
36
static S3Protocol  protocolG[TSDB_MAX_EP_NUM] = {S3ProtocolHTTPS};
37
//  static S3Protocol protocolG = S3ProtocolHTTP;
38
static S3UriStyle uriStyleG[TSDB_MAX_EP_NUM] = {S3UriStylePath};
39
static int        retriesG = 5;
40
static int        timeoutMsG = 0;
41

42
extern int8_t tsS3Oss[];
43

44
int32_t s3Begin() {
2,371✔
45
  S3Status    status;
46
  const char *hostname = tsS3Hostname[0];
2,371✔
47
  const char *env_hn = getenv("S3_HOSTNAME");
2,371✔
48

49
  if (env_hn) {
2,371!
50
    hostname = env_hn;
×
51
  }
52

53
  if ((status = S3_initialize("s3", verifyPeerG | S3_INIT_ALL, hostname)) != S3StatusOK) {
2,371!
54
    uError("Failed to initialize libs3: %s\n", S3_get_status_name(status));
×
55
    TAOS_RETURN(TSDB_CODE_FAILED);
×
56
  }
57

58
  for (int i = 0; i < tsS3EpNum; i++) {
4,742✔
59
    protocolG[i] = tsS3Https[i] ? S3ProtocolHTTPS : S3ProtocolHTTP;
2,371✔
60
    uriStyleG[i] = tsS3Oss[i] ? S3UriStyleVirtualHost : S3UriStylePath;
2,371✔
61
  }
62

63
  TAOS_RETURN(TSDB_CODE_SUCCESS);
2,371✔
64
}
65

66
void s3End() { S3_deinitialize(); }
2,371✔
67

68
int32_t s3Init() { TAOS_RETURN(TSDB_CODE_SUCCESS); /*s3Begin();*/ }
×
69

70
static int32_t s3ListBucket(char const *bucketname);
71

72
static void s3DumpCfgByEp(int8_t epIndex) {
1✔
73
  // clang-format off
74
  (void)fprintf(stdout,
2✔
75
                "%-24s %s\n"
76
                "%-24s %s\n"
77
                "%-24s %s\n"
78
                "%-24s %s\n"
79
                "%-24s %s\n"
80
                "%-24s %s\n",
81
                "hostName", tsS3Hostname[epIndex],
1✔
82
                "bucketName", tsS3BucketName,
83
                "protocol", (protocolG[epIndex] == S3ProtocolHTTPS ? "https" : "http"),
1!
84
                "uristyle", (uriStyleG[epIndex] == S3UriStyleVirtualHost ? "virtualhost" : "path"),
1✔
85
                "accessKey", tsS3AccessKeyId[epIndex],
1✔
86
                "accessKeySecret", tsS3AccessKeySecret[epIndex]);
1!
87
  // clang-format on
88
}
1✔
89

90
int32_t s3CheckCfg() {
1✔
91
  int32_t code = 0, lino = 0;
1✔
92

93
  for (int8_t i = 0; i < tsS3EpNum; i++) {
2✔
94
    (void)fprintf(stdout, "test s3 ep (%d/%d):\n", i + 1, tsS3EpNum);
1✔
95
    s3DumpCfgByEp(i);
1✔
96

97
    // test put
98
    char        testdata[17] = "0123456789abcdef";
1✔
99
    const char *objectname[] = {"s3test.txt"};
1✔
100
    char        path[PATH_MAX] = {0};
1✔
101
    int         ds_len = strlen(TD_DIRSEP);
1✔
102
    int         tmp_len = strlen(tsTempDir);
1✔
103

104
    (void)snprintf(path, PATH_MAX, "%s", tsTempDir);
1✔
105
    if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) {
1!
106
      (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP);
×
107
      (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", objectname[0]);
×
108
    } else {
109
      (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", objectname[0]);
1✔
110
    }
111

112
    TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
1✔
113
    if (!fp) {
1!
114
      (void)fprintf(stderr, "failed to open test file: %s.\n", path);
×
115
      // uError("ERROR: %s Failed to open %s", __func__, path);
116
      TAOS_CHECK_GOTO(terrno, &lino, _next);
×
117
    }
118
    if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) {
1!
119
      (void)fprintf(stderr, "failed to write test file: %s.\n", path);
×
120
      TAOS_CHECK_GOTO(terrno, &lino, _next);
×
121
    }
122
    if (taosFsyncFile(fp) < 0) {
1!
123
      (void)fprintf(stderr, "failed to fsync test file: %s.\n", path);
×
124
      TAOS_CHECK_GOTO(terrno, &lino, _next);
×
125
    }
126
    (void)taosCloseFile(&fp);
1✔
127

128
    (void)fprintf(stderr, "\nstart to put object: %s, file: %s content: %s\n", objectname[0], path, testdata);
1✔
129
    code = s3PutObjectFromFileOffsetByEp(path, objectname[0], 0, 16, i);
1✔
130
    if (code != 0) {
1!
131
      (void)fprintf(stderr, "put object %s : failed.\n", objectname[0]);
×
132
      TAOS_CHECK_GOTO(code, &lino, _next);
×
133
    }
134
    (void)fprintf(stderr, "put object %s: success.\n\n", objectname[0]);
1✔
135

136
    // list buckets
137
    (void)fprintf(stderr, "start to list bucket %s by prefix s3.\n", tsS3BucketName);
1✔
138
    code = s3ListBucketByEp(tsS3BucketName, i);
1✔
139
    if (code != 0) {
1!
140
      (void)fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName);
×
141
      TAOS_CHECK_GOTO(code, &lino, _next);
×
142
    }
143
    (void)fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName);
1✔
144

145
    // test range get
146
    uint8_t *pBlock = NULL;
1✔
147
    int      c_offset = 10;
1✔
148
    int      c_len = 6;
1✔
149

150
    (void)fprintf(stderr, "start to range get object %s offset: %d len: %d.\n", objectname[0], c_offset, c_len);
1✔
151
    code = s3GetObjectBlockByEp(objectname[0], c_offset, c_len, true, &pBlock, i);
1✔
152
    if (code != 0) {
1!
153
      (void)fprintf(stderr, "get object %s : failed.\n", objectname[0]);
×
154
      TAOS_CHECK_GOTO(code, &lino, _next);
×
155
    }
156
    char buf[7] = {0};
1✔
157
    (void)memcpy(buf, pBlock, c_len);
1✔
158
    taosMemoryFree(pBlock);
1✔
159
    (void)fprintf(stderr, "object content: %s\n", buf);
1✔
160
    (void)fprintf(stderr, "get object %s: success.\n\n", objectname[0]);
1✔
161

162
    // delete test object
163
    (void)fprintf(stderr, "start to delete object: %s.\n", objectname[0]);
1✔
164
    code = s3DeleteObjectsByEp(objectname, 1, i);
1✔
165
    if (code != 0) {
1!
166
      (void)fprintf(stderr, "delete object %s : failed.\n", objectname[0]);
×
167
      TAOS_CHECK_GOTO(code, &lino, _next);
×
168
    }
169
    (void)fprintf(stderr, "delete object %s: success.\n\n", objectname[0]);
1✔
170

171
  _next:
1✔
172
    if (fp) {
1!
173
      (void)taosCloseFile(&fp);
×
174
    }
175

176
    if (TSDB_CODE_SUCCESS != code) {
1!
177
      (void)fprintf(stderr, "s3 check failed, code: %d, line: %d, index: %d.\n", code, lino, i);
×
178
    }
179

180
    (void)fprintf(stdout, "=================================================================\n");
1✔
181
  }
182

183
  // s3End();
184

185
  TAOS_RETURN(code);
1✔
186
}
187

188
static int should_retry() {
×
189
  /*
190
  if (retriesG--) {
191
    // Sleep before next retry; start out with a 1 second sleep
192
    static int retrySleepInterval = 1 * SLEEP_UNITS_PER_SECOND;
193
    sleep(retrySleepInterval);
194
    // Next sleep 1 second longer
195
    retrySleepInterval++;
196
    return 1;
197
  }
198
  */
199

200
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
201
}
202

203
static void s3PrintError(const char *filename, int lineno, const char *funcname, S3Status status,
×
204
                         char error_details[]) {
205
  if (status < S3StatusErrorAccessDenied) {
×
206
    uError("%s/%s:%d-%s: %s", __func__, filename, lineno, funcname, S3_get_status_name(status));
×
207
  } else {
208
    uError("%s/%s:%d-%s: %s, %s", __func__, filename, lineno, funcname, S3_get_status_name(status), error_details);
×
209
  }
210
}
×
211

212
typedef struct {
213
  char      err_msg[512];
214
  S3Status  status;
215
  uint64_t  content_length;
216
  TdFilePtr file;
217
} TS3GetData;
218

219
typedef struct {
220
  char     err_msg[512];
221
  S3Status status;
222
  uint64_t content_length;
223
  char    *buf;
224
  int64_t  buf_pos;
225
} TS3SizeCBD;
226

227
static S3Status responsePropertiesCallbackNull(const S3ResponseProperties *properties, void *callbackData) {
2✔
228
  //  (void)callbackData;
229
  return S3StatusOK;
2✔
230
}
231

232
static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) {
1✔
233
  TS3SizeCBD *cbd = callbackData;
1✔
234
  if (properties->contentLength > 0) {
1!
235
    cbd->content_length = properties->contentLength;
1✔
236
  } else {
237
    cbd->content_length = 0;
×
238
  }
239

240
  return S3StatusOK;
1✔
241
}
242

243
static void responseCompleteCallback(S3Status status, const S3ErrorDetails *error, void *callbackData) {
4✔
244
  TS3SizeCBD *cbd = callbackData;
4✔
245
  cbd->status = status;
4✔
246

247
  int       len = 0;
4✔
248
  const int elen = sizeof(cbd->err_msg);
4✔
249
  if (error) {
4!
250
    if (error->message && elen - len > 0) {
4!
251
      len += tsnprintf(&(cbd->err_msg[len]), elen - len, "  Message: %s\n", error->message);
×
252
    }
253
    if (error->resource && elen - len > 0) {
4!
254
      len += tsnprintf(&(cbd->err_msg[len]), elen - len, "  Resource: %s\n", error->resource);
×
255
    }
256
    if (error->furtherDetails && elen - len > 0) {
4!
257
      len += tsnprintf(&(cbd->err_msg[len]), elen - len, "  Further Details: %s\n", error->furtherDetails);
×
258
    }
259
    if (error->extraDetailsCount && elen - len > 0) {
4!
260
      len += tsnprintf(&(cbd->err_msg[len]), elen - len, "%s", "  Extra Details:\n");
×
261
      for (int i = 0; i < error->extraDetailsCount; i++) {
×
262
        if (elen - len > 0) {
×
263
          len += tsnprintf(&(cbd->err_msg[len]), elen - len, "    %s: %s\n", error->extraDetails[i].name,
×
264
                          error->extraDetails[i].value);
×
265
        }
266
      }
267
    }
268
  }
269
}
4✔
270

271
static SArray *getListByPrefix(const char *prefix);
272
static void    s3FreeObjectKey(void *pItem);
273

274
static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex) {
1✔
275
  int32_t code = 0;
1✔
276

277
  SArray *objectArray = getListByPrefixByEp("s3", epIndex);
1✔
278
  if (objectArray == NULL) {
1!
279
    TAOS_RETURN(TSDB_CODE_FAILED);
×
280
  }
281

282
  const char **object_name = TARRAY_DATA(objectArray);
1✔
283
  int          size = TARRAY_SIZE(objectArray);
1✔
284

285
  (void)fprintf(stderr, "objects:\n");
1✔
286
  for (int i = 0; i < size; ++i) {
2✔
287
    (void)fprintf(stderr, "%s\n", object_name[i]);
1✔
288
  }
289

290
  taosArrayDestroyEx(objectArray, s3FreeObjectKey);
1✔
291

292
  TAOS_RETURN(code);
1✔
293
}
294

295
static int32_t s3ListBucket(char const *bucketname) {
×
296
  int32_t code = 0;
×
297

298
  SArray *objectArray = getListByPrefix("s3");
×
299
  if (objectArray == NULL) {
×
300
    TAOS_RETURN(TSDB_CODE_FAILED);
×
301
  }
302

303
  const char **object_name = TARRAY_DATA(objectArray);
×
304
  int          size = TARRAY_SIZE(objectArray);
×
305

306
  (void)fprintf(stderr, "objects:\n");
×
307
  for (int i = 0; i < size; ++i) {
×
308
    (void)fprintf(stderr, "%s\n", object_name[i]);
×
309
  }
310

311
  taosArrayDestroyEx(objectArray, s3FreeObjectKey);
×
312

313
  TAOS_RETURN(code);
×
314
}
315

316
typedef struct growbuffer {
317
  // The total number of bytes, and the start byte
318
  int size;
319
  // The start byte
320
  int start;
321
  // The blocks
322
  char               data[64 * 1024];
323
  struct growbuffer *prev, *next;
324
} growbuffer;
325

326
// returns nonzero on success, zero on out of memory
327
static int growbuffer_append(growbuffer **gb, const char *data, int dataLen) {
×
328
  int origDataLen = dataLen;
×
329
  while (dataLen) {
×
330
    growbuffer *buf = *gb ? (*gb)->prev : 0;
×
331
    if (!buf || (buf->size == sizeof(buf->data))) {
×
332
      buf = (growbuffer *)malloc(sizeof(growbuffer));
×
333
      if (!buf) {
×
334
        return 0;
×
335
      }
336
      buf->size = 0;
×
337
      buf->start = 0;
×
338
      if (*gb && (*gb)->prev) {
×
339
        buf->prev = (*gb)->prev;
×
340
        buf->next = *gb;
×
341
        (*gb)->prev->next = buf;
×
342
        (*gb)->prev = buf;
×
343
      } else {
344
        buf->prev = buf->next = buf;
×
345
        *gb = buf;
×
346
      }
347
    }
348

349
    int toCopy = (sizeof(buf->data) - buf->size);
×
350
    if (toCopy > dataLen) {
×
351
      toCopy = dataLen;
×
352
    }
353

354
    (void)memcpy(&(buf->data[buf->size]), data, toCopy);
×
355

356
    buf->size += toCopy, data += toCopy, dataLen -= toCopy;
×
357
  }
358

359
  return origDataLen;
×
360
}
361

362
static void growbuffer_read(growbuffer **gb, int amt, int *amtReturn, char *buffer) {
×
363
  *amtReturn = 0;
×
364

365
  growbuffer *buf = *gb;
×
366

367
  if (!buf) {
×
368
    return;
×
369
  }
370

371
  *amtReturn = (buf->size > amt) ? amt : buf->size;
×
372

373
  (void)memcpy(buffer, &(buf->data[buf->start]), *amtReturn);
×
374

375
  buf->start += *amtReturn, buf->size -= *amtReturn;
×
376

377
  if (buf->size == 0) {
×
378
    if (buf->next == buf) {
×
379
      *gb = 0;
×
380
    } else {
381
      *gb = buf->next;
×
382
      buf->prev->next = buf->next;
×
383
      buf->next->prev = buf->prev;
×
384
    }
385
    free(buf);
×
386
    buf = NULL;
×
387
  }
388
}
389

390
static void growbuffer_destroy(growbuffer *gb) {
×
391
  growbuffer *start = gb;
×
392

393
  while (gb) {
×
394
    growbuffer *next = gb->next;
×
395
    free(gb);
×
396
    gb = (next == start) ? 0 : next;
×
397
  }
398
}
×
399

400
typedef struct put_object_callback_data {
401
  char     err_msg[512];
402
  S3Status status;
403
  uint64_t content_length;
404
  // FILE       *infile;
405
  TdFilePtr   infileFD;
406
  growbuffer *gb;
407
  uint64_t    contentLength, originalContentLength;
408
  uint64_t    totalContentLength, totalOriginalContentLength;
409
  int         noStatus;
410
} put_object_callback_data;
411

412
#define MULTIPART_CHUNK_SIZE (64 << 20)  // multipart is 768M
413

414
typedef struct {
415
  char     err_msg[512];
416
  S3Status status;
417
  uint64_t content_length;
418
  // used for initial multipart
419
  char *upload_id;
420

421
  // used for upload part object
422
  char **etags;
423
  int    next_etags_pos;
424

425
  // used for commit Upload
426
  growbuffer *gb;
427
  int         remaining;
428
} UploadManager;
429

430
typedef struct list_parts_callback_data {
431
  char           err_msg[512];
432
  S3Status       status;
433
  uint64_t       content_length;
434
  int            isTruncated;
435
  char           nextPartNumberMarker[24];
436
  char           initiatorId[256];
437
  char           initiatorDisplayName[256];
438
  char           ownerId[256];
439
  char           ownerDisplayName[256];
440
  char           storageClass[256];
441
  int            partsCount;
442
  int            handlePartsStart;
443
  int            allDetails;
444
  int            noPrint;
445
  UploadManager *manager;
446
} list_parts_callback_data;
447

448
typedef struct MultipartPartData {
449
  put_object_callback_data put_object_data;
450
  int                      seq;
451
  UploadManager           *manager;
452
} MultipartPartData;
453

454
static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) {
1✔
455
  put_object_callback_data *data = (put_object_callback_data *)callbackData;
1✔
456
  /*
457
  if (data->infileFD == 0) {
458
    MultipartPartData *mpd = (MultipartPartData *)callbackData;
459
    data = &mpd->put_object_data;
460
  }
461
  */
462
  int ret = 0;
1✔
463

464
  if (data->contentLength) {
1!
465
    int toRead = ((data->contentLength > (unsigned)bufferSize) ? (unsigned)bufferSize : data->contentLength);
1✔
466
    if (data->gb) {
1!
467
      growbuffer_read(&(data->gb), toRead, &ret, buffer);
×
468
    } else if (data->infileFD) {
1!
469
      // ret = fread(buffer, 1, toRead, data->infile);
470
      ret = taosReadFile(data->infileFD, buffer, toRead);
1✔
471
    }
472
  }
473

474
  data->contentLength -= ret;
1✔
475
  data->totalContentLength -= ret;
1✔
476
  /* log too many open files
477
  if (data->contentLength && !data->noStatus) {
478
    vTrace("%llu bytes remaining ", (unsigned long long)data->totalContentLength);
479
    vTrace("(%d%% complete) ...\n", (int)(((data->totalOriginalContentLength - data->totalContentLength) * 100) /
480
                                          data->totalOriginalContentLength));
481
  }
482
  */
483
  return ret;
1✔
484
}
485

486
S3Status initial_multipart_callback(const char *upload_id, void *callbackData) {
×
487
  UploadManager *manager = (UploadManager *)callbackData;
×
488
  manager->upload_id = strdup(upload_id);
×
489
  manager->status = S3StatusOK;
×
490
  return S3StatusOK;
×
491
}
492

493
S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properties, void *callbackData) {
×
494
  if (S3StatusOK != responsePropertiesCallbackNull(properties, callbackData)) {
×
495
    uError("%s failed at line %d to process null callback.", __func__, __LINE__);
×
496
  }
497

498
  MultipartPartData *data = (MultipartPartData *)callbackData;
×
499
  int                seq = data->seq;
×
500
  const char        *etag = properties->eTag;
×
501
  data->manager->etags[seq - 1] = strdup(etag);
×
502
  data->manager->next_etags_pos = seq;
×
503
  return S3StatusOK;
×
504
}
505

506
S3Status MultipartResponseProperiesCallbackWithCp(const S3ResponseProperties *properties, void *callbackData) {
×
507
  if (S3StatusOK != responsePropertiesCallbackNull(properties, callbackData)) {
×
508
    uError("%s failed at line %d to process null callback.", __func__, __LINE__);
×
509
  }
510

511
  MultipartPartData *data = (MultipartPartData *)callbackData;
×
512
  int                seq = data->seq;
×
513
  const char        *etag = properties->eTag;
×
514
  data->manager->etags[seq - 1] = strdup(etag);
×
515
  // data->manager->next_etags_pos = seq;
516
  return S3StatusOK;
×
517
}
518

519
static int multipartPutXmlCallback(int bufferSize, char *buffer, void *callbackData) {
×
520
  UploadManager *manager = (UploadManager *)callbackData;
×
521
  int            ret = 0;
×
522

523
  if (manager->remaining) {
×
524
    int toRead = ((manager->remaining > bufferSize) ? bufferSize : manager->remaining);
×
525
    growbuffer_read(&(manager->gb), toRead, &ret, buffer);
×
526
  }
527
  manager->remaining -= ret;
×
528
  return ret;
×
529
}
530
/*
531
static S3Status listPartsCallback(int isTruncated, const char *nextPartNumberMarker, const char *initiatorId,
532
                                  const char *initiatorDisplayName, const char *ownerId, const char *ownerDisplayName,
533
                                  const char *storageClass, int partsCount, int handlePartsStart,
534
                                  const S3ListPart *parts, void *callbackData) {
535
  list_parts_callback_data *data = (list_parts_callback_data *)callbackData;
536

537
  data->isTruncated = isTruncated;
538
  data->handlePartsStart = handlePartsStart;
539
  UploadManager *manager = data->manager;
540

541
  if (nextPartNumberMarker) {
542
    snprintf(data->nextPartNumberMarker, sizeof(data->nextPartNumberMarker), "%s", nextPartNumberMarker);
543
  } else {
544
    data->nextPartNumberMarker[0] = 0;
545
  }
546

547
  if (initiatorId) {
548
    snprintf(data->initiatorId, sizeof(data->initiatorId), "%s", initiatorId);
549
  } else {
550
    data->initiatorId[0] = 0;
551
  }
552

553
  if (initiatorDisplayName) {
554
    snprintf(data->initiatorDisplayName, sizeof(data->initiatorDisplayName), "%s", initiatorDisplayName);
555
  } else {
556
    data->initiatorDisplayName[0] = 0;
557
  }
558

559
  if (ownerId) {
560
    snprintf(data->ownerId, sizeof(data->ownerId), "%s", ownerId);
561
  } else {
562
    data->ownerId[0] = 0;
563
  }
564

565
  if (ownerDisplayName) {
566
    snprintf(data->ownerDisplayName, sizeof(data->ownerDisplayName), "%s", ownerDisplayName);
567
  } else {
568
    data->ownerDisplayName[0] = 0;
569
  }
570

571
  if (storageClass) {
572
    snprintf(data->storageClass, sizeof(data->storageClass), "%s", storageClass);
573
  } else {
574
    data->storageClass[0] = 0;
575
  }
576

577
  if (partsCount && !data->partsCount && !data->noPrint) {
578
    // printListPartsHeader();
579
  }
580

581
  int i;
582
  for (i = 0; i < partsCount; i++) {
583
    const S3ListPart *part = &(parts[i]);
584
    char              timebuf[256];
585
    if (data->noPrint) {
586
      manager->etags[handlePartsStart + i] = strdup(part->eTag);
587
      manager->next_etags_pos++;
588
      manager->remaining = manager->remaining - part->size;
589
    } else {
590
      time_t t = (time_t)part->lastModified;
591
      strftime(timebuf, sizeof(timebuf), "%Y-%m-%dT%H:%M:%SZ", gmtime(&t));
592
      printf("%-30s", timebuf);
593
      printf("%-15llu", (unsigned long long)part->partNumber);
594
      printf("%-45s", part->eTag);
595
      printf("%-15llu\n", (unsigned long long)part->size);
596
    }
597
  }
598

599
  data->partsCount += partsCount;
600

601
  return S3StatusOK;
602
}
603

604
static int try_get_parts_info(const char *bucketName, const char *key, UploadManager *manager) {
605
  S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
606
                                   0, awsRegionG};
607

608
  S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
609
&listPartsCallback};
610

611
  list_parts_callback_data data;
612

613
  memset(&data, 0, sizeof(list_parts_callback_data));
614

615
  data.partsCount = 0;
616
  data.allDetails = 0;
617
  data.manager = manager;
618
  data.noPrint = 1;
619
  do {
620
    data.isTruncated = 0;
621
    do {
622
      S3_list_parts(&bucketContext, key, data.nextPartNumberMarker, manager->upload_id, 0, 0, 0, timeoutMsG,
623
                    &listPartsHandler, &data);
624
    } while (S3_status_is_retryable(data.status) && should_retry());
625
    if (data.status != S3StatusOK) {
626
      break;
627
    }
628
  } while (data.isTruncated);
629

630
  if (data.status == S3StatusOK) {
631
    if (!data.partsCount) {
632
      // printListMultipartHeader(data.allDetails);
633
    }
634
  } else {
635
    s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg);
636
    return -1;
637
  }
638

639
  return 0;
640
}
641
*/
642

643
static int32_t s3PutObjectFromFileSimple(S3BucketContext *bucket_context, char const *object_name, int64_t size,
1✔
644
                                         S3PutProperties *put_prop, put_object_callback_data *data) {
645
  int32_t            code = 0;
1✔
646
  S3PutObjectHandler putObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
1✔
647
                                         &putObjectDataCallback};
648

649
  do {
650
    S3_put_object(bucket_context, object_name, size, put_prop, 0, 0, &putObjectHandler, data);
1✔
651
  } while (S3_status_is_retryable(data->status) && should_retry());
1!
652

653
  if (data->status != S3StatusOK) {
1!
654
    // s3PrintError(__FILE__, __LINE__, __func__, data->status, data->err_msg);
655
    s3PrintError(NULL, __LINE__, __func__, data->status, data->err_msg);
×
656
    code = TAOS_SYSTEM_ERROR(EIO);
×
657
  } else if (data->contentLength) {
1!
658
    uError("%s Failed to put remaining %llu bytes", __func__, (unsigned long long)data->contentLength);
×
659
    code = TAOS_SYSTEM_ERROR(EIO);
×
660
  }
661

662
  TAOS_RETURN(code);
1✔
663
}
664

665
static int32_t s3PutObjectFromFileWithoutCp(S3BucketContext *bucket_context, char const *object_name,
×
666
                                            int64_t contentLength, S3PutProperties *put_prop,
667
                                            put_object_callback_data *data) {
668
  int32_t       code = 0, lino = 0;
×
669
  uint64_t      totalContentLength = contentLength;
×
670
  uint64_t      todoContentLength = contentLength;
×
671
  UploadManager manager = {0};
×
672

673
  uint64_t  chunk_size = MULTIPART_CHUNK_SIZE >> 3;
×
674
  int       totalSeq = (contentLength + chunk_size - 1) / chunk_size;
×
675
  const int max_part_num = 10000;
×
676
  if (totalSeq > max_part_num) {
×
677
    chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
×
678
    totalSeq = (contentLength + chunk_size - 1) / chunk_size;
×
679
  }
680

681
  MultipartPartData partData;
682
  (void)memset(&partData, 0, sizeof(MultipartPartData));
×
683
  int partContentLength = 0;
×
684

685
  S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
×
686
                                       &initial_multipart_callback};
687

688
  S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback},
×
689
                                         &putObjectDataCallback};
690

691
  S3MultipartCommitHandler commit_handler = {
×
692
      {&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
693

694
  manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *));
×
695
  if (!manager.etags) {
×
696
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
697
  }
698
  manager.next_etags_pos = 0;
×
699
  do {
700
    S3_initiate_multipart(bucket_context, object_name, 0, &handler, 0, timeoutMsG, &manager);
×
701
  } while (S3_status_is_retryable(manager.status) && should_retry());
×
702

703
  if (manager.upload_id == 0 || manager.status != S3StatusOK) {
×
704
    s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
×
705
    TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
×
706
  }
707

708
upload:
×
709
  todoContentLength -= chunk_size * manager.next_etags_pos;
×
710
  for (int seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++) {
×
711
    partData.manager = &manager;
×
712
    partData.seq = seq;
×
713
    if (partData.put_object_data.gb == NULL) {
×
714
      partData.put_object_data = *data;
×
715
    }
716
    partContentLength = ((contentLength > chunk_size) ? chunk_size : contentLength);
×
717
    // printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength);
718
    partData.put_object_data.contentLength = partContentLength;
×
719
    partData.put_object_data.originalContentLength = partContentLength;
×
720
    partData.put_object_data.totalContentLength = todoContentLength;
×
721
    partData.put_object_data.totalOriginalContentLength = totalContentLength;
×
722
    put_prop->md5 = 0;
×
723
    do {
724
      S3_upload_part(bucket_context, object_name, put_prop, &putObjectHandler, seq, manager.upload_id,
×
725
                     partContentLength, 0, timeoutMsG, &partData);
726
    } while (S3_status_is_retryable(partData.put_object_data.status) && should_retry());
×
727
    if (partData.put_object_data.status != S3StatusOK) {
×
728
      s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg);
×
729
      TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
×
730
    }
731
    contentLength -= chunk_size;
×
732
    todoContentLength -= chunk_size;
×
733
  }
734

735
  int i;
736
  int size = 0;
×
737
  size += growbuffer_append(&(manager.gb), "<CompleteMultipartUpload>", strlen("<CompleteMultipartUpload>"));
×
738
  char buf[256];
739
  int  n;
740
  for (i = 0; i < totalSeq; i++) {
×
741
    if (!manager.etags[i]) {
×
742
      TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
×
743
    }
744
    n = tsnprintf(buf, sizeof(buf),
×
745
                 "<Part><PartNumber>%d</PartNumber>"
746
                 "<ETag>%s</ETag></Part>",
747
                 i + 1, manager.etags[i]);
×
748
    size += growbuffer_append(&(manager.gb), buf, n);
×
749
  }
750
  size += growbuffer_append(&(manager.gb), "</CompleteMultipartUpload>", strlen("</CompleteMultipartUpload>"));
×
751
  manager.remaining = size;
×
752

753
  do {
754
    S3_complete_multipart_upload(bucket_context, object_name, &commit_handler, manager.upload_id, manager.remaining, 0,
×
755
                                 timeoutMsG, &manager);
756
  } while (S3_status_is_retryable(manager.status) && should_retry());
×
757
  if (manager.status != S3StatusOK) {
×
758
    s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
×
759
    TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
×
760
  }
761

762
_exit:
×
763
  if (code) {
×
764
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
765
  }
766
  if (manager.upload_id) {
×
767
    taosMemoryFree(manager.upload_id);
×
768
  }
769
  for (i = 0; i < manager.next_etags_pos; i++) {
×
770
    taosMemoryFree(manager.etags[i]);
×
771
  }
772
  growbuffer_destroy(manager.gb);
×
773
  taosMemoryFree(manager.etags);
×
774

775
  TAOS_RETURN(code);
×
776
}
777

778
static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, const char *file, int32_t lmtime,
×
779
                                         char const *object_name, int64_t contentLength, S3PutProperties *put_prop,
780
                                         put_object_callback_data *data) {
781
  int32_t code = 0, lino = 0;
×
782

783
  uint64_t totalContentLength = contentLength;
×
784
  // uint64_t      todoContentLength = contentLength;
785
  UploadManager manager = {0};
×
786

787
  uint64_t  chunk_size = MULTIPART_CHUNK_SIZE >> 3;
×
788
  int       totalSeq = (contentLength + chunk_size - 1) / chunk_size;
×
789
  const int max_part_num = 10000;
×
790
  if (totalSeq > max_part_num) {
×
791
    chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
×
792
    totalSeq = (contentLength + chunk_size - 1) / chunk_size;
×
793
  }
794

795
  bool need_init_upload = true;
×
796
  char file_cp_path[TSDB_FILENAME_LEN];
797
  (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file);
×
798

799
  SCheckpoint cp = {0};
×
800
  cp.parts = taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart));
×
801
  if (!cp.parts) {
×
802
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
803
  }
804

805
  if (taosCheckExistFile(file_cp_path)) {
×
806
    if (!cos_cp_load(file_cp_path, &cp) && cos_cp_is_valid_upload(&cp, contentLength, lmtime)) {
×
807
      manager.upload_id = strdup(cp.upload_id);
×
808
      need_init_upload = false;
×
809
    } else {
810
      TAOS_CHECK_GOTO(cos_cp_remove(file_cp_path), &lino, _exit);
×
811
    }
812
  }
813

814
  if (need_init_upload) {
×
815
    S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
×
816
                                         &initial_multipart_callback};
817
    do {
818
      S3_initiate_multipart(bucket_context, object_name, 0, &handler, 0, timeoutMsG, &manager);
×
819
    } while (S3_status_is_retryable(manager.status) && should_retry());
×
820

821
    if (manager.upload_id == 0 || manager.status != S3StatusOK) {
×
822
      s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
×
823
      TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
×
824
    }
825

826
    cos_cp_build_upload(&cp, file, contentLength, lmtime, manager.upload_id, chunk_size);
×
827
  }
828

829
  if (cos_cp_open(file_cp_path, &cp)) {
×
830
    TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
×
831
  }
832

833
  int     part_num = 0;
×
834
  int64_t consume_bytes = 0;
×
835
  // SCheckpointPart *parts = taosMemoryCalloc(cp.part_num, sizeof(SCheckpointPart));
836
  // cos_cp_get_undo_parts(&cp, &part_num, parts, &consume_bytes);
837

838
  MultipartPartData partData;
839
  (void)memset(&partData, 0, sizeof(MultipartPartData));
×
840
  int partContentLength = 0;
×
841

842
  S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallbackWithCp, &responseCompleteCallback},
×
843
                                         &putObjectDataCallback};
844

845
  S3MultipartCommitHandler commit_handler = {
×
846
      {&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
847

848
  manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *));
×
849
  if (!manager.etags) {
×
850
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
851
  }
852

853
  manager.next_etags_pos = 0;
×
854

855
upload:
×
856
  // todoContentLength -= chunk_size * manager.next_etags_pos;
857
  for (int i = 0; i < cp.part_num; ++i) {
×
858
    if (cp.parts[i].completed) {
×
859
      continue;
×
860
    }
861

862
    if (i > 0 && cp.parts[i - 1].completed) {
×
863
      if (taosLSeekFile(data->infileFD, cp.parts[i].offset, SEEK_SET) < 0) {
×
864
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
865
      }
866
    }
867

868
    int seq = cp.parts[i].index + 1;
×
869

870
    partData.manager = &manager;
×
871
    partData.seq = seq;
×
872
    if (partData.put_object_data.gb == NULL) {
×
873
      partData.put_object_data = *data;
×
874
    }
875

876
    partContentLength = cp.parts[i].size;
×
877
    partData.put_object_data.contentLength = partContentLength;
×
878
    partData.put_object_data.originalContentLength = partContentLength;
×
879
    // partData.put_object_data.totalContentLength = todoContentLength;
880
    partData.put_object_data.totalOriginalContentLength = totalContentLength;
×
881
    put_prop->md5 = 0;
×
882
    do {
883
      S3_upload_part(bucket_context, object_name, put_prop, &putObjectHandler, seq, manager.upload_id,
×
884
                     partContentLength, 0, timeoutMsG, &partData);
885
    } while (S3_status_is_retryable(partData.put_object_data.status) && should_retry());
×
886
    if (partData.put_object_data.status != S3StatusOK) {
×
887
      s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg);
×
888
      TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
×
889
    }
890

891
    if (!manager.etags[seq - 1]) {
×
892
      TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
×
893
    }
894

895
    cos_cp_update(&cp, cp.parts[seq - 1].index, manager.etags[seq - 1], 0);
×
896
    TAOS_CHECK_GOTO(cos_cp_dump(&cp), &lino, _exit);
×
897

898
    contentLength -= chunk_size;
×
899
    // todoContentLength -= chunk_size;
900
  }
901

902
  TAOS_CHECK_GOTO(cos_cp_close(cp.thefile), &lino, _exit);
×
903
  cp.thefile = 0;
×
904

905
  int size = 0;
×
906
  size += growbuffer_append(&(manager.gb), "<CompleteMultipartUpload>", strlen("<CompleteMultipartUpload>"));
×
907
  char buf[256];
908
  int  n;
909
  for (int i = 0; i < cp.part_num; ++i) {
×
910
    n = tsnprintf(buf, sizeof(buf),
×
911
                 "<Part><PartNumber>%d</PartNumber>"
912
                 "<ETag>%s</ETag></Part>",
913
                 // i + 1, manager.etags[i]);
914
                 cp.parts[i].index + 1, cp.parts[i].etag);
×
915
    size += growbuffer_append(&(manager.gb), buf, n);
×
916
  }
917
  size += growbuffer_append(&(manager.gb), "</CompleteMultipartUpload>", strlen("</CompleteMultipartUpload>"));
×
918
  manager.remaining = size;
×
919

920
  do {
921
    S3_complete_multipart_upload(bucket_context, object_name, &commit_handler, manager.upload_id, manager.remaining, 0,
×
922
                                 timeoutMsG, &manager);
923
  } while (S3_status_is_retryable(manager.status) && should_retry());
×
924
  if (manager.status != S3StatusOK) {
×
925
    s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
×
926
    TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
×
927
  }
928

929
  TAOS_CHECK_GOTO(cos_cp_remove(file_cp_path), &lino, _exit);
×
930

931
_exit:
×
932
  /*
933
  if (parts) {
934
    taosMemoryFree(parts);
935
  }
936
  */
937
  if (code) {
×
938
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
939
  }
940

941
  if (cp.thefile) {
×
942
    if (cos_cp_close(cp.thefile)) {
×
943
      uError("%s failed at line %d to close cp file.", __func__, lino);
×
944
    }
945
  }
946
  if (cp.parts) {
×
947
    taosMemoryFree(cp.parts);
×
948
  }
949

950
  if (manager.upload_id) {
×
951
    taosMemoryFree(manager.upload_id);
×
952
  }
953
  for (int i = 0; i < cp.part_num; ++i) {
×
954
    if (manager.etags[i]) {
×
955
      taosMemoryFree(manager.etags[i]);
×
956
    }
957
  }
958
  taosMemoryFree(manager.etags);
×
959
  growbuffer_destroy(manager.gb);
×
960

961
  TAOS_RETURN(code);
×
962
}
963

964
int32_t s3PutObjectFromFile2ByEp(const char *file, const char *object_name, int8_t withcp, int8_t epIndex) {
×
965
  int32_t                  code = 0;
×
966
  int32_t                  lmtime = 0;
×
967
  const char              *filename = 0;
×
968
  uint64_t                 contentLength = 0;
×
969
  const char              *cacheControl = 0, *contentType = 0, *md5 = 0;
×
970
  const char              *contentDispositionFilename = 0, *contentEncoding = 0;
×
971
  int64_t                  expires = -1;
×
972
  S3CannedAcl              cannedAcl = S3CannedAclPrivate;
×
973
  int                      metaPropertiesCount = 0;
×
974
  S3NameValue              metaProperties[S3_MAX_METADATA_COUNT];
975
  char                     useServerSideEncryption = 0;
×
976
  put_object_callback_data data = {0};
×
977

978
  if (taosStatFile(file, (int64_t *)&contentLength, &lmtime, NULL) < 0) {
×
979
    uError("ERROR: %s Failed to stat file %s: ", __func__, file);
×
980
    TAOS_RETURN(terrno);
×
981
  }
982

983
  if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) {
×
984
    uError("ERROR: %s Failed to open file %s: ", __func__, file);
×
985
    TAOS_RETURN(terrno);
×
986
  }
987

988
  data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength =
×
989
      contentLength;
990

991
  S3BucketContext bucketContext = {tsS3Hostname[epIndex],
×
992
                                   tsS3BucketName,
993
                                   protocolG[epIndex],
×
994
                                   uriStyleG[epIndex],
×
995
                                   tsS3AccessKeyId[epIndex],
×
996
                                   tsS3AccessKeySecret[epIndex],
×
997
                                   0,
998
                                   awsRegionG};
999

1000
  S3PutProperties putProperties = {contentType,     md5,
×
1001
                                   cacheControl,    contentDispositionFilename,
1002
                                   contentEncoding, expires,
1003
                                   cannedAcl,       metaPropertiesCount,
1004
                                   metaProperties,  useServerSideEncryption};
1005

1006
  if (contentLength <= MULTIPART_CHUNK_SIZE) {
×
1007
    code = s3PutObjectFromFileSimple(&bucketContext, object_name, contentLength, &putProperties, &data);
×
1008
  } else {
1009
    if (withcp) {
×
1010
      code = s3PutObjectFromFileWithCp(&bucketContext, file, lmtime, object_name, contentLength, &putProperties, &data);
×
1011
    } else {
1012
      code = s3PutObjectFromFileWithoutCp(&bucketContext, object_name, contentLength, &putProperties, &data);
×
1013
    }
1014
  }
1015

1016
  if (data.infileFD) {
×
1017
    (void)taosCloseFile(&data.infileFD);
×
1018
  } else if (data.gb) {
×
1019
    growbuffer_destroy(data.gb);
×
1020
  }
1021

1022
  TAOS_RETURN(code);
×
1023
}
1024

1025
int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t withcp) {
×
1026
  int32_t code = TSDB_CODE_SUCCESS;
×
1027

1028
  int8_t startIndex = taosRand() % tsS3EpNum;
×
1029
  for (int8_t i = 0; i < tsS3EpNum; ++i) {
×
1030
    int8_t epIndex = (startIndex + i) % tsS3EpNum;
×
1031
    code = s3PutObjectFromFile2ByEp(file, object_name, withcp, epIndex);
×
1032
    if (code == TSDB_CODE_SUCCESS) {
×
1033
      break;
×
1034
    }
1035
  }
1036

1037
  return code;
×
1038
}
1039

1040
static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *object_name, int64_t offset, int64_t size,
1✔
1041
                                             int8_t epIndex) {
1042
  int32_t                  code = 0;
1✔
1043
  int32_t                  lmtime = 0;
1✔
1044
  const char              *filename = 0;
1✔
1045
  uint64_t                 contentLength = 0;
1✔
1046
  const char              *cacheControl = 0, *contentType = 0, *md5 = 0;
1✔
1047
  const char              *contentDispositionFilename = 0, *contentEncoding = 0;
1✔
1048
  int64_t                  expires = -1;
1✔
1049
  S3CannedAcl              cannedAcl = S3CannedAclPrivate;
1✔
1050
  int                      metaPropertiesCount = 0;
1✔
1051
  S3NameValue              metaProperties[S3_MAX_METADATA_COUNT];
1052
  char                     useServerSideEncryption = 0;
1✔
1053
  put_object_callback_data data = {0};
1✔
1054

1055
  if (taosStatFile(file, (int64_t *)&contentLength, &lmtime, NULL) < 0) {
1!
1056
    uError("ERROR: %s Failed to stat file %s: ", __func__, file);
×
1057
    TAOS_RETURN(terrno);
×
1058
  }
1059

1060
  contentLength = size;
1✔
1061

1062
  if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) {
1!
1063
    uError("ERROR: %s Failed to open file %s: ", __func__, file);
×
1064
    TAOS_RETURN(terrno);
×
1065
  }
1066
  if (taosLSeekFile(data.infileFD, offset, SEEK_SET) < 0) {
1!
1067
    (void)taosCloseFile(&data.infileFD);
×
1068
    TAOS_RETURN(terrno);
×
1069
  }
1070

1071
  data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength =
1✔
1072
      contentLength;
1073

1074
  S3BucketContext bucketContext = {tsS3Hostname[epIndex],
1✔
1075
                                   tsS3BucketName,
1076
                                   protocolG[epIndex],
1✔
1077
                                   uriStyleG[epIndex],
1✔
1078
                                   tsS3AccessKeyId[epIndex],
1✔
1079
                                   tsS3AccessKeySecret[epIndex],
1✔
1080
                                   0,
1081
                                   awsRegionG};
1082

1083
  S3PutProperties putProperties = {contentType,     md5,
1✔
1084
                                   cacheControl,    contentDispositionFilename,
1085
                                   contentEncoding, expires,
1086
                                   cannedAcl,       metaPropertiesCount,
1087
                                   metaProperties,  useServerSideEncryption};
1088

1089
  if (contentLength <= MULTIPART_CHUNK_SIZE) {
1!
1090
    code = s3PutObjectFromFileSimple(&bucketContext, object_name, contentLength, &putProperties, &data);
1✔
1091
  } else {
1092
    code = s3PutObjectFromFileWithoutCp(&bucketContext, object_name, contentLength, &putProperties, &data);
×
1093
  }
1094

1095
  if (data.infileFD) {
1!
1096
    (void)taosCloseFile(&data.infileFD);
1✔
1097
  } else if (data.gb) {
×
1098
    growbuffer_destroy(data.gb);
×
1099
  }
1100

1101
  TAOS_RETURN(code);
1✔
1102
}
1103

1104
int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) {
×
1105
  int32_t code = TSDB_CODE_SUCCESS;
×
1106

1107
  int8_t startIndex = taosRand() % tsS3EpNum;
×
1108
  for (int8_t i = 0; i < tsS3EpNum; ++i) {
×
1109
    int8_t epIndex = (startIndex + i) % tsS3EpNum;
×
1110
    code = s3PutObjectFromFileOffsetByEp(file, object_name, offset, size, epIndex);
×
1111
    if (code == TSDB_CODE_SUCCESS) {
×
1112
      break;
×
1113
    }
1114
  }
1115

1116
  return code;
×
1117
}
1118

1119
typedef struct list_bucket_callback_data {
1120
  char     err_msg[512];
1121
  S3Status status;
1122
  int      isTruncated;
1123
  char     nextMarker[1024];
1124
  int      keyCount;
1125
  int      allDetails;
1126
  SArray  *objectArray;
1127
} list_bucket_callback_data;
1128

1129
static S3Status listBucketCallback(int isTruncated, const char *nextMarker, int contentsCount,
1✔
1130
                                   const S3ListBucketContent *contents, int commonPrefixesCount,
1131
                                   const char **commonPrefixes, void *callbackData) {
1132
  list_bucket_callback_data *data = (list_bucket_callback_data *)callbackData;
1✔
1133

1134
  data->isTruncated = isTruncated;
1✔
1135
  if ((!nextMarker || !nextMarker[0]) && contentsCount) {
1!
1136
    nextMarker = contents[contentsCount - 1].key;
1✔
1137
  }
1138
  if (nextMarker) {
1!
1139
    (void)snprintf(data->nextMarker, sizeof(data->nextMarker), "%s", nextMarker);
1✔
1140
  } else {
1141
    data->nextMarker[0] = 0;
×
1142
  }
1143

1144
  if (contentsCount && !data->keyCount) {
1!
1145
    // printListBucketHeader(data->allDetails);
1146
  }
1147

1148
  int i;
1149
  for (i = 0; i < contentsCount; ++i) {
2✔
1150
    const S3ListBucketContent *content = &(contents[i]);
1✔
1151
    // printf("%-50s", content->key);
1152
    char *object_key = strdup(content->key);
1✔
1153
    if (!taosArrayPush(data->objectArray, &object_key)) {
2!
1154
      taosMemoryFree(object_key);
×
1155
      return S3StatusOutOfMemory;
×
1156
    }
1157
  }
1158
  data->keyCount += contentsCount;
1✔
1159

1160
  for (i = 0; i < commonPrefixesCount; i++) {
1!
1161
    // printf("\nCommon Prefix: %s\n", commonPrefixes[i]);
1162
  }
1163

1164
  return S3StatusOK;
1✔
1165
}
1166

1167
static void s3FreeObjectKey(void *pItem) {
1✔
1168
  char *key = *(char **)pItem;
1✔
1169
  taosMemoryFree(key);
1✔
1170
}
1✔
1171

1172
static SArray *getListByPrefixByEp(const char *prefix, int8_t epIndex) {
1✔
1173
  S3BucketContext     bucketContext = {tsS3Hostname[epIndex],
1✔
1174
                                       tsS3BucketName,
1175
                                       protocolG[epIndex],
1✔
1176
                                       uriStyleG[epIndex],
1✔
1177
                                       tsS3AccessKeyId[epIndex],
1✔
1178
                                       tsS3AccessKeySecret[epIndex],
1✔
1179
                                       0,
1180
                                       awsRegionG};
1181
  S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
1✔
1182
                                           &listBucketCallback};
1183

1184
  const char /**marker = 0,*/ *delimiter = 0;
1✔
1185
  int /*maxkeys = 0, */        allDetails = 0;
1✔
1186
  list_bucket_callback_data    data = {0};
1✔
1187
  data.objectArray = taosArrayInit(32, sizeof(void *));
1✔
1188
  if (!data.objectArray) {
1!
1189
    uError("%s: %s", __func__, "out of memoty");
×
1190
    return NULL;
×
1191
  }
1192
  /*if (marker) {
1193
    snprintf(data.nextMarker, sizeof(data.nextMarker), "%s", marker);
1194
    } else {*/
1195
  data.nextMarker[0] = 0;
1✔
1196
  //}
1197
  data.keyCount = 0;
1✔
1198
  data.allDetails = allDetails;
1✔
1199

1200
  do {
1201
    data.isTruncated = 0;
1✔
1202
    do {
1203
      S3_list_bucket(&bucketContext, prefix, data.nextMarker, delimiter, 0 /*maxkeys*/, 0, timeoutMsG,
1✔
1204
                     &listBucketHandler, &data);
1205
    } while (S3_status_is_retryable(data.status) && should_retry());
1!
1206
    if (data.status != S3StatusOK) {
1!
1207
      break;
×
1208
    }
1209
  } while (data.isTruncated /* && (!maxkeys || (data.keyCount < maxkeys))*/);
1!
1210

1211
  if (data.status == S3StatusOK) {
1!
1212
    if (data.keyCount > 0) {
1!
1213
      return data.objectArray;
1✔
1214
    }
1215
  } else {
1216
    uError("failed to list with prefix %s: %s", prefix, S3_get_status_name(data.status));
×
1217
    // s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg);
1218
  }
1219

1220
  taosArrayDestroyEx(data.objectArray, s3FreeObjectKey);
×
1221
  return NULL;
×
1222
}
1223

1224
static SArray *getListByPrefix(const char *prefix) {
×
1225
  SArray *objectArray = NULL;
×
1226
  int8_t  startIndex = taosRand() % tsS3EpNum;
×
1227
  for (int8_t i = 0; i < tsS3EpNum; ++i) {
×
1228
    int8_t epIndex = (startIndex + i) % tsS3EpNum;
×
1229
    objectArray = getListByPrefixByEp(prefix, epIndex);
×
1230
    if (objectArray) {
×
1231
      break;
×
1232
    }
1233
  }
1234

1235
  return objectArray;
×
1236
}
1237

1238
static int32_t s3DeleteObjectsByEp(const char *object_name[], int nobject, int8_t epIndex) {
1✔
1239
  int32_t code = 0;
1✔
1240

1241
  S3BucketContext   bucketContext = {tsS3Hostname[epIndex],
1✔
1242
                                     tsS3BucketName,
1243
                                     protocolG[epIndex],
1✔
1244
                                     uriStyleG[epIndex],
1✔
1245
                                     tsS3AccessKeyId[epIndex],
1✔
1246
                                     tsS3AccessKeySecret[epIndex],
1✔
1247
                                     0,
1248
                                     awsRegionG};
1249
  S3ResponseHandler responseHandler = {0, &responseCompleteCallback};
1✔
1250

1251
  for (int i = 0; i < nobject; ++i) {
2✔
1252
    TS3SizeCBD cbd = {0};
1✔
1253
    do {
1254
      S3_delete_object(&bucketContext, object_name[i], 0, timeoutMsG, &responseHandler, &cbd);
1✔
1255
    } while (S3_status_is_retryable(cbd.status) && should_retry());
1!
1256

1257
    if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
1!
1258
      s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg);
×
1259
      code = TSDB_CODE_FAILED;
×
1260
    }
1261
  }
1262

1263
  TAOS_RETURN(code);
1✔
1264
}
1265

1266
int32_t s3DeleteObjects(const char *object_name[], int nobject) {
×
1267
  int32_t code = 0;
×
1268

1269
  int8_t startIndex = taosRand() % tsS3EpNum;
×
1270
  for (int8_t i = 0; i < tsS3EpNum; ++i) {
×
1271
    int8_t epIndex = (startIndex + i) % tsS3EpNum;
×
1272
    code = s3DeleteObjectsByEp(object_name, nobject, epIndex);
×
1273
    if (code == TSDB_CODE_SUCCESS) {
×
1274
      break;
×
1275
    }
1276
  }
1277

1278
  return code;
×
1279
}
1280

1281
void s3DeleteObjectsByPrefix(const char *prefix) {
×
1282
  SArray *objectArray = getListByPrefix(prefix);
×
1283
  if (objectArray == NULL) return;
×
1284
  int32_t code = s3DeleteObjects(TARRAY_DATA(objectArray), TARRAY_SIZE(objectArray));
×
1285
  if (!code) {
×
1286
    uError("%s failed at line %d since %s.", __func__, __LINE__, tstrerror(code));
×
1287
  }
1288
  taosArrayDestroyEx(objectArray, s3FreeObjectKey);
×
1289
}
1290

1291
static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) {
1✔
1292
  TS3SizeCBD *cbd = callbackData;
1✔
1293
  /*
1294
  if (cbd->content_length != bufferSize) {
1295
    cbd->status = S3StatusAbortedByCallback;
1296
    return S3StatusAbortedByCallback;
1297
  }
1298
  */
1299
  if (!cbd->buf) {
1!
1300
    cbd->buf = taosMemoryCalloc(1, cbd->content_length);
1✔
1301
  }
1302

1303
  if (cbd->buf) {
1!
1304
    (void)memcpy(cbd->buf + cbd->buf_pos, buffer, bufferSize);
1✔
1305
    cbd->buf_pos += bufferSize;
1✔
1306
    cbd->status = S3StatusOK;
1✔
1307
    return S3StatusOK;
1✔
1308
  } else {
1309
    cbd->status = S3StatusAbortedByCallback;
×
1310
    return S3StatusAbortedByCallback;
×
1311
  }
1312
}
1313

1314
static int32_t s3GetObjectBlockByEp(const char *object_name, int64_t offset, int64_t size, bool check,
1✔
1315
                                    uint8_t **ppBlock, int8_t epIndex) {
1316
  int         status = 0;
1✔
1317
  int64_t     ifModifiedSince = -1, ifNotModifiedSince = -1;
1✔
1318
  const char *ifMatch = 0, *ifNotMatch = 0;
1✔
1319

1320
  S3BucketContext    bucketContext = {tsS3Hostname[epIndex],
1✔
1321
                                      tsS3BucketName,
1322
                                      protocolG[epIndex],
1✔
1323
                                      uriStyleG[epIndex],
1✔
1324
                                      tsS3AccessKeyId[epIndex],
1✔
1325
                                      tsS3AccessKeySecret[epIndex],
1✔
1326
                                      0,
1327
                                      awsRegionG};
1328
  S3GetConditions    getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch};
1✔
1329
  S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback},
1✔
1330
                                         &getObjectDataCallback};
1331

1332
  TS3SizeCBD cbd = {0};
1✔
1333
  int        retryCount = 0;
1✔
1334
  static int maxRetryCount = 5;
1335
  static int minRetryInterval = 1000;  // ms
1336
  static int maxRetryInterval = 3000;  // ms
1337

1338
_retry:
1✔
1339
  (void)memset(&cbd, 0, sizeof(cbd));
1✔
1340
  cbd.content_length = size;
1✔
1341
  cbd.buf_pos = 0;
1✔
1342
  do {
1343
    S3_get_object(&bucketContext, object_name, &getConditions, offset, size, 0, 0, &getObjectHandler, &cbd);
1✔
1344
  } while (S3_status_is_retryable(cbd.status) && should_retry());
1!
1345

1346
  if (cbd.status != S3StatusOK) {
1!
1347
    if (S3StatusErrorSlowDown == cbd.status && retryCount++ < maxRetryCount) {
×
1348
      taosMsleep(taosRand() % (maxRetryInterval - minRetryInterval + 1) + minRetryInterval);
×
1349
      uInfo("%s: %d/%s(%s) retry get object", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg);
×
1350
      goto _retry;
×
1351
    }
1352
    uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg);
×
1353

1354
    TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO));
×
1355
  }
1356

1357
  if (check && cbd.buf_pos != size) {
1!
1358
    uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg);
×
1359

1360
    TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO));
×
1361
  }
1362

1363
  *ppBlock = (uint8_t *)cbd.buf;
1✔
1364

1365
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1✔
1366
}
1367

1368
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
×
1369
  int32_t code = 0;
×
1370

1371
  int8_t startIndex = taosRand() % tsS3EpNum;
×
1372
  for (int8_t i = 0; i < tsS3EpNum; ++i) {
×
1373
    int8_t epIndex = (startIndex + i) % tsS3EpNum;
×
1374
    code = s3GetObjectBlockByEp(object_name, offset, size, check, ppBlock, epIndex);
×
1375
    if (code == TSDB_CODE_SUCCESS) {
×
1376
      break;
×
1377
    }
1378
  }
1379

1380
  return code;
×
1381
}
1382

1383
static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) {
×
1384
  TS3GetData *cbd = (TS3GetData *)callbackData;
×
1385
  size_t      wrote = taosWriteFile(cbd->file, buffer, bufferSize);
×
1386
  return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK);
×
1387
}
1388

1389
static int32_t s3GetObjectToFileByEp(const char *object_name, const char *fileName, int8_t epIndex) {
×
1390
  int64_t     ifModifiedSince = -1, ifNotModifiedSince = -1;
×
1391
  const char *ifMatch = 0, *ifNotMatch = 0;
×
1392

1393
  S3BucketContext    bucketContext = {tsS3Hostname[epIndex],
×
1394
                                      tsS3BucketName,
1395
                                      protocolG[epIndex],
×
1396
                                      uriStyleG[epIndex],
×
1397
                                      tsS3AccessKeyId[epIndex],
×
1398
                                      tsS3AccessKeySecret[epIndex],
×
1399
                                      0,
1400
                                      awsRegionG};
1401
  S3GetConditions    getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch};
×
1402
  S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
×
1403
                                         &getObjectCallback};
1404

1405
  TdFilePtr pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
×
1406
  if (pFile == NULL) {
×
1407
    uError("[s3] open file error, terrno:%d, fileName:%s", terrno, fileName);
×
1408
    TAOS_RETURN(terrno);
×
1409
  }
1410

1411
  TS3GetData cbd = {0};
×
1412
  cbd.file = pFile;
×
1413
  do {
1414
    S3_get_object(&bucketContext, object_name, &getConditions, 0, 0, 0, 0, &getObjectHandler, &cbd);
×
1415
  } while (S3_status_is_retryable(cbd.status) && should_retry());
×
1416

1417
  if (cbd.status != S3StatusOK) {
×
1418
    uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
×
1419
    (void)taosCloseFile(&pFile);
×
1420
    TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO));
×
1421
  }
1422

1423
  (void)taosCloseFile(&pFile);
×
1424

1425
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1426
}
1427

1428
int32_t s3GetObjectToFile(const char *object_name, const char *fileName) {
×
1429
  int32_t code = 0;
×
1430

1431
  int8_t startIndex = taosRand() % tsS3EpNum;
×
1432
  for (int8_t i = 0; i < tsS3EpNum; ++i) {
×
1433
    int8_t epIndex = (startIndex + i) % tsS3EpNum;
×
1434
    code = s3GetObjectToFileByEp(object_name, fileName, epIndex);
×
1435
    if (code == TSDB_CODE_SUCCESS) {
×
1436
      break;
×
1437
    }
1438
  }
1439

1440
  return code;
×
1441
}
1442

1443
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) {
×
1444
  SArray *objectArray = getListByPrefix(prefix);
×
1445
  if (objectArray == NULL) TAOS_RETURN(TSDB_CODE_FAILED);
×
1446

1447
  for (size_t i = 0; i < taosArrayGetSize(objectArray); i++) {
×
1448
    char       *object = taosArrayGetP(objectArray, i);
×
1449
    const char *tmp = strchr(object, '/');
×
1450
    tmp = (tmp == NULL) ? object : tmp + 1;
×
1451
    char fileName[PATH_MAX] = {0};
×
1452
    if (path[strlen(path) - 1] != TD_DIRSEP_CHAR) {
×
1453
      (void)snprintf(fileName, PATH_MAX, "%s%s%s", path, TD_DIRSEP, tmp);
×
1454
    } else {
1455
      (void)snprintf(fileName, PATH_MAX, "%s%s", path, tmp);
×
1456
    }
1457
    if (s3GetObjectToFile(object, fileName) != 0) {
×
1458
      taosArrayDestroyEx(objectArray, s3FreeObjectKey);
×
1459
      TAOS_RETURN(TSDB_CODE_FAILED);
×
1460
    }
1461
  }
1462
  taosArrayDestroyEx(objectArray, s3FreeObjectKey);
×
1463
  return 0;
×
1464
}
1465

1466
static long s3SizeByEp(const char *object_name, int8_t epIndex) {
×
1467
  long size = 0;
×
1468
  int  status = 0;
×
1469

1470
  S3BucketContext bucketContext = {tsS3Hostname[epIndex],
×
1471
                                   tsS3BucketName,
1472
                                   protocolG[epIndex],
×
1473
                                   uriStyleG[epIndex],
×
1474
                                   tsS3AccessKeyId[epIndex],
×
1475
                                   tsS3AccessKeySecret[epIndex],
×
1476
                                   0,
1477
                                   awsRegionG};
1478

1479
  S3ResponseHandler responseHandler = {&responsePropertiesCallback, &responseCompleteCallback};
×
1480

1481
  TS3SizeCBD cbd = {0};
×
1482
  do {
1483
    S3_head_object(&bucketContext, object_name, 0, 0, &responseHandler, &cbd);
×
1484
  } while (S3_status_is_retryable(cbd.status) && should_retry());
×
1485

1486
  if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
×
1487
    s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg);
×
1488

1489
    TAOS_RETURN(TSDB_CODE_FAILED);
×
1490
  }
1491

1492
  size = cbd.content_length;
×
1493

1494
  return size;
×
1495
}
1496

1497
long s3Size(const char *object_name) {
×
1498
  long size = 0;
×
1499

1500
  int8_t startIndex = taosRand() % tsS3EpNum;
×
1501
  for (int8_t i = 0; i < tsS3EpNum; ++i) {
×
1502
    int8_t epIndex = (startIndex + i) % tsS3EpNum;
×
1503
    size = s3SizeByEp(object_name, epIndex);
×
1504
    if (size > 0) {
×
1505
      break;
×
1506
    }
1507
  }
1508

1509
  return size;
×
1510
}
1511

1512
void s3EvictCache(const char *path, long object_size) {}
×
1513

1514
#elif defined(USE_COS)
1515

1516
#include "cos_api.h"
1517
#include "cos_http_io.h"
1518
#include "cos_log.h"
1519

1520
int32_t s3Begin() { TAOS_RETURN(TSDB_CODE_SUCCESS); }
1521

1522
int32_t s3Init() {
1523
  if (cos_http_io_initialize(NULL, 0) != COSE_OK) {
1524
    return -1;
1525
  }
1526

1527
  // set log level, default COS_LOG_WARN
1528
  cos_log_set_level(COS_LOG_WARN);
1529

1530
  // set log output, default stderr
1531
  cos_log_set_output(NULL);
1532

1533
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1534
}
1535

1536
// void s3CleanUp() { cos_http_io_deinitialize(); }
1537

1538
static void log_status(cos_status_t *s) {
1539
  cos_warn_log("status->code: %d", s->code);
1540
  if (s->error_code) cos_warn_log("status->error_code: %s", s->error_code);
1541
  if (s->error_msg) cos_warn_log("status->error_msg: %s", s->error_msg);
1542
  if (s->req_id) cos_warn_log("status->req_id: %s", s->req_id);
1543
}
1544

1545
static void s3InitRequestOptions(cos_request_options_t *options, int is_cname) {
1546
  options->config = cos_config_create(options->pool);
1547

1548
  cos_config_t *config = options->config;
1549

1550
  cos_str_set(&config->endpoint, tsS3Endpoint);
1551
  cos_str_set(&config->access_key_id, tsS3AccessKeyId);
1552
  cos_str_set(&config->access_key_secret, tsS3AccessKeySecret);
1553
  cos_str_set(&config->appid, tsS3AppId);
1554

1555
  config->is_cname = is_cname;
1556

1557
  options->ctl = cos_http_controller_create(options->pool, 0);
1558
}
1559

1560
int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) {
1561
  int32_t                code = 0;
1562
  cos_pool_t            *p = NULL;
1563
  int                    is_cname = 0;
1564
  cos_status_t          *s = NULL;
1565
  cos_request_options_t *options = NULL;
1566
  cos_string_t           bucket, object, file;
1567
  cos_table_t           *resp_headers;
1568
  // int                    traffic_limit = 0;
1569

1570
  cos_pool_create(&p, NULL);
1571
  options = cos_request_options_create(p);
1572
  s3InitRequestOptions(options, is_cname);
1573
  cos_table_t *headers = NULL;
1574
  /*
1575
  if (traffic_limit) {
1576
    // 限速值设置范围为819200 - 838860800,即100KB/s - 100MB/s,如果超出该范围将返回400错误
1577
    headers = cos_table_make(p, 1);
1578
    cos_table_add_int(headers, "x-cos-traffic-limit", 819200);
1579
  }
1580
  */
1581
  cos_str_set(&bucket, tsS3BucketName);
1582
  cos_str_set(&file, file_str);
1583
  cos_str_set(&object, object_str);
1584
  s = cos_put_object_from_file(options, &bucket, &object, &file, headers, &resp_headers);
1585
  log_status(s);
1586

1587
  cos_pool_destroy(p);
1588

1589
  if (s->code != 200) {
1590
    TAOS_RETURN(s->code);
1591
  }
1592

1593
  TAOS_RETURN(code);
1594
}
1595

1596
int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str, int8_t withcp) {
1597
  int32_t                     code = 0;
1598
  cos_pool_t                 *p = NULL;
1599
  int                         is_cname = 0;
1600
  cos_status_t               *s = NULL;
1601
  cos_request_options_t      *options = NULL;
1602
  cos_string_t                bucket, object, file;
1603
  cos_table_t                *resp_headers;
1604
  int                         traffic_limit = 0;
1605
  cos_table_t                *headers = NULL;
1606
  cos_resumable_clt_params_t *clt_params = NULL;
1607

1608
  (void)withcp;
1609
  cos_pool_create(&p, NULL);
1610
  options = cos_request_options_create(p);
1611
  s3InitRequestOptions(options, is_cname);
1612
  headers = cos_table_make(p, 0);
1613
  cos_str_set(&bucket, tsS3BucketName);
1614
  cos_str_set(&file, file_str);
1615
  cos_str_set(&object, object_str);
1616

1617
  // upload
1618
  clt_params = cos_create_resumable_clt_params_content(p, 1024 * 1024, 8, COS_FALSE, NULL);
1619
  s = cos_resumable_upload_file(options, &bucket, &object, &file, headers, NULL, clt_params, NULL, &resp_headers, NULL);
1620

1621
  log_status(s);
1622
  if (!cos_status_is_ok(s)) {
1623
    vError("s3: %d(%s)", s->code, s->error_msg);
1624
    vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
1625
    code = TAOS_SYSTEM_ERROR(EIO);
1626
    return code;
1627
  }
1628

1629
  cos_pool_destroy(p);
1630

1631
  if (s->code != 200) {
1632
    TAOS_RETURN(s->code);
1633
  }
1634

1635
  TAOS_RETURN(code);
1636
}
1637

1638
void s3DeleteObjectsByPrefix(const char *prefix_str) {
1639
  cos_pool_t            *p = NULL;
1640
  cos_request_options_t *options = NULL;
1641
  int                    is_cname = 0;
1642
  cos_string_t           bucket;
1643
  cos_status_t          *s = NULL;
1644
  cos_string_t           prefix;
1645

1646
  cos_pool_create(&p, NULL);
1647
  options = cos_request_options_create(p);
1648
  s3InitRequestOptions(options, is_cname);
1649
  cos_str_set(&bucket, tsS3BucketName);
1650
  cos_str_set(&prefix, prefix_str);
1651

1652
  s = cos_delete_objects_by_prefix(options, &bucket, &prefix);
1653
  log_status(s);
1654
  cos_pool_destroy(p);
1655
}
1656

1657
int32_t s3DeleteObjects(const char *object_name[], int nobject) {
1658
  cos_pool_t            *p = NULL;
1659
  int                    is_cname = 0;
1660
  cos_string_t           bucket;
1661
  cos_table_t           *resp_headers = NULL;
1662
  cos_request_options_t *options = NULL;
1663
  cos_list_t             object_list;
1664
  cos_list_t             deleted_object_list;
1665
  int                    is_quiet = COS_TRUE;
1666

1667
  cos_pool_create(&p, NULL);
1668
  options = cos_request_options_create(p);
1669
  s3InitRequestOptions(options, is_cname);
1670
  cos_str_set(&bucket, tsS3BucketName);
1671

1672
  cos_list_init(&object_list);
1673
  cos_list_init(&deleted_object_list);
1674

1675
  for (int i = 0; i < nobject; ++i) {
1676
    cos_object_key_t *content = cos_create_cos_object_key(p);
1677
    cos_str_set(&content->key, object_name[i]);
1678
    cos_list_add_tail(&content->node, &object_list);
1679
  }
1680

1681
  cos_status_t *s = cos_delete_objects(options, &bucket, &object_list, is_quiet, &resp_headers, &deleted_object_list);
1682
  log_status(s);
1683

1684
  cos_pool_destroy(p);
1685

1686
  if (cos_status_is_ok(s)) {
1687
    cos_warn_log("delete objects succeeded\n");
1688
  } else {
1689
    cos_warn_log("delete objects failed\n");
1690
  }
1691

1692
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1693
}
1694

1695
bool s3Exists(const char *object_name) {
1696
  bool                      ret = false;
1697
  cos_pool_t               *p = NULL;
1698
  int                       is_cname = 0;
1699
  cos_status_t             *s = NULL;
1700
  cos_request_options_t    *options = NULL;
1701
  cos_string_t              bucket;
1702
  cos_string_t              object;
1703
  cos_table_t              *resp_headers;
1704
  cos_table_t              *headers = NULL;
1705
  cos_object_exist_status_e object_exist;
1706

1707
  cos_pool_create(&p, NULL);
1708
  options = cos_request_options_create(p);
1709
  s3InitRequestOptions(options, is_cname);
1710
  cos_str_set(&bucket, tsS3BucketName);
1711
  cos_str_set(&object, object_name);
1712

1713
  s = cos_check_object_exist(options, &bucket, &object, headers, &object_exist, &resp_headers);
1714
  if (object_exist == COS_OBJECT_NON_EXIST) {
1715
    cos_warn_log("object: %.*s non exist.\n", object.len, object.data);
1716
  } else if (object_exist == COS_OBJECT_EXIST) {
1717
    ret = true;
1718
    cos_warn_log("object: %.*s exist.\n", object.len, object.data);
1719
  } else {
1720
    cos_warn_log("object: %.*s unknown status.\n", object.len, object.data);
1721
    log_status(s);
1722
  }
1723

1724
  cos_pool_destroy(p);
1725

1726
  return ret;
1727
}
1728

1729
bool s3Get(const char *object_name, const char *path) {
1730
  bool                   ret = false;
1731
  cos_pool_t            *p = NULL;
1732
  int                    is_cname = 0;
1733
  cos_status_t          *s = NULL;
1734
  cos_request_options_t *options = NULL;
1735
  cos_string_t           bucket;
1736
  cos_string_t           object;
1737
  cos_string_t           file;
1738
  cos_table_t           *resp_headers = NULL;
1739
  cos_table_t           *headers = NULL;
1740
  int                    traffic_limit = 0;
1741

1742
  // 创建内存池
1743
  cos_pool_create(&p, NULL);
1744

1745
  // 初始化请求选项
1746
  options = cos_request_options_create(p);
1747
  s3InitRequestOptions(options, is_cname);
1748
  cos_str_set(&bucket, tsS3BucketName);
1749
  if (traffic_limit) {
1750
    // 限速值设置范围为819200 - 838860800,即100KB/s - 100MB/s,如果超出该范围将返回400错误
1751
    headers = cos_table_make(p, 1);
1752
    cos_table_add_int(headers, "x-cos-traffic-limit", 819200);
1753
  }
1754

1755
  // 下载对象
1756
  cos_str_set(&file, path);
1757
  cos_str_set(&object, object_name);
1758
  s = cos_get_object_to_file(options, &bucket, &object, headers, NULL, &file, &resp_headers);
1759
  if (cos_status_is_ok(s)) {
1760
    ret = true;
1761
    cos_warn_log("get object succeeded\n");
1762
  } else {
1763
    cos_warn_log("get object failed\n");
1764
  }
1765

1766
  // 销毁内存池
1767
  cos_pool_destroy(p);
1768

1769
  return ret;
1770
}
1771

1772
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, bool check, uint8_t **ppBlock) {
1773
  (void)check;
1774
  int32_t                code = 0, lino = 0;
1775
  cos_pool_t            *p = NULL;
1776
  int                    is_cname = 0;
1777
  cos_status_t          *s = NULL;
1778
  cos_request_options_t *options = NULL;
1779
  cos_string_t           bucket;
1780
  cos_string_t           object;
1781
  cos_table_t           *resp_headers;
1782
  cos_table_t           *headers = NULL;
1783
  cos_buf_t             *content = NULL;
1784
  // cos_string_t file;
1785
  // int  traffic_limit = 0;
1786
  char range_buf[64];
1787

1788
  // 创建内存池
1789
  cos_pool_create(&p, NULL);
1790

1791
  // 初始化请求选项
1792
  options = cos_request_options_create(p);
1793
  // init_test_request_options(options, is_cname);
1794
  s3InitRequestOptions(options, is_cname);
1795
  cos_str_set(&bucket, tsS3BucketName);
1796
  cos_str_set(&object, object_name);
1797
  cos_list_t download_buffer;
1798
  cos_list_init(&download_buffer);
1799
  /*
1800
  if (traffic_limit) {
1801
    // 限速值设置范围为819200 - 838860800,单位默认为 bit/s,即800Kb/s - 800Mb/s,如果超出该范围将返回400错误
1802
    headers = cos_table_make(p, 1);
1803
    cos_table_add_int(headers, "x-cos-traffic-limit", 819200);
1804
  }
1805
  */
1806

1807
  headers = cos_table_create_if_null(options, headers, 1);
1808
  apr_snprintf(range_buf, sizeof(range_buf), "bytes=%" APR_INT64_T_FMT "-%" APR_INT64_T_FMT, offset,
1809
               offset + block_size - 1);
1810
  apr_table_add(headers, COS_RANGE, range_buf);
1811

1812
  s = cos_get_object_to_buffer(options, &bucket, &object, headers, NULL, &download_buffer, &resp_headers);
1813
  log_status(s);
1814
  if (!cos_status_is_ok(s)) {
1815
    uError("s3: %d(%s)", s->code, s->error_msg);
1816
    uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
1817
    TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO));
1818
  }
1819

1820
  // print_headers(resp_headers);
1821
  int64_t len = 0;
1822
  int64_t size = 0;
1823
  int64_t pos = 0;
1824
  cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) { len += cos_buf_size(content); }
1825
  // char *buf = cos_pcalloc(p, (apr_size_t)(len + 1));
1826
  char *buf = taosMemoryCalloc(1, (apr_size_t)(len));
1827
  if (!buf) {
1828
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
1829
  }
1830

1831
  // buf[len] = '\0';
1832
  cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) {
1833
    size = cos_buf_size(content);
1834
    memcpy(buf + pos, content->pos, (size_t)size);
1835
    pos += size;
1836
  }
1837
  // cos_warn_log("Download data=%s", buf);
1838

1839
_exit:
1840
  // 销毁内存池
1841
  cos_pool_destroy(p);
1842

1843
  *ppBlock = buf;
1844

1845
  TAOS_RETURN(code);
1846
}
1847

1848
typedef struct {
1849
  int64_t size;
1850
  int32_t atime;
1851
  char    name[TSDB_FILENAME_LEN];
1852
} SEvictFile;
1853

1854
static int32_t evictFileCompareAsce(const void *pLeft, const void *pRight) {
1855
  SEvictFile *lhs = (SEvictFile *)pLeft;
1856
  SEvictFile *rhs = (SEvictFile *)pRight;
1857
  return lhs->atime < rhs->atime ? -1 : 1;
1858
}
1859

1860
void s3EvictCache(const char *path, long object_size) {
1861
  SDiskSize disk_size = {0};
1862
  char      dir_name[TSDB_FILENAME_LEN] = "\0";
1863

1864
  tstrncpy(dir_name, path, TSDB_FILENAME_LEN);
1865
  taosDirName(dir_name);
1866

1867
  if (taosGetDiskSize((char *)dir_name, &disk_size) < 0) {
1868
    vError("failed to get disk:%s size since %s", path, terrstr());
1869
    return;
1870
  }
1871

1872
  if (object_size >= disk_size.avail - (1 << 30)) {
1873
    // evict too old files
1874
    // 1, list data files' atime under dir(path)
1875
    tdbDirPtr pDir = taosOpenDir(dir_name);
1876
    if (pDir == NULL) {
1877
      vError("failed to open %s since %s", dir_name, terrstr());
1878
    }
1879
    SArray        *evict_files = taosArrayInit(16, sizeof(SEvictFile));
1880
    tdbDirEntryPtr pDirEntry;
1881
    while ((pDirEntry = taosReadDir(pDir)) != NULL) {
1882
      char *name = taosGetDirEntryName(pDirEntry);
1883
      if (!strncmp(name + strlen(name) - 5, ".data", 5)) {
1884
        SEvictFile e_file = {0};
1885
        char       entry_name[TSDB_FILENAME_LEN] = "\0";
1886
        int        dir_len = strlen(dir_name);
1887

1888
        memcpy(e_file.name, dir_name, dir_len);
1889
        e_file.name[dir_len] = '/';
1890
        memcpy(e_file.name + dir_len + 1, name, strlen(name));
1891

1892
        taosStatFile(e_file.name, &e_file.size, NULL, &e_file.atime);
1893

1894
        taosArrayPush(evict_files, &e_file);
1895
      }
1896
    }
1897
    taosCloseDir(&pDir);
1898

1899
    // 2, sort by atime
1900
    taosArraySort(evict_files, evictFileCompareAsce);
1901

1902
    // 3, remove files ascendingly until we get enough object_size space
1903
    long   evict_size = 0;
1904
    size_t ef_size = TARRAY_SIZE(evict_files);
1905
    for (size_t i = 0; i < ef_size; ++i) {
1906
      SEvictFile *evict_file = taosArrayGet(evict_files, i);
1907
      taosRemoveFile(evict_file->name);
1908
      evict_size += evict_file->size;
1909
      if (evict_size >= object_size) {
1910
        break;
1911
      }
1912
    }
1913

1914
    taosArrayDestroy(evict_files);
1915
  }
1916
}
1917

1918
long s3Size(const char *object_name) {
1919
  long size = 0;
1920

1921
  cos_pool_t            *p = NULL;
1922
  int                    is_cname = 0;
1923
  cos_status_t          *s = NULL;
1924
  cos_request_options_t *options = NULL;
1925
  cos_string_t           bucket;
1926
  cos_string_t           object;
1927
  cos_table_t           *resp_headers = NULL;
1928

1929
  // 创建内存池
1930
  cos_pool_create(&p, NULL);
1931

1932
  // 初始化请求选项
1933
  options = cos_request_options_create(p);
1934
  s3InitRequestOptions(options, is_cname);
1935
  cos_str_set(&bucket, tsS3BucketName);
1936

1937
  // 获取对象元数据
1938
  cos_str_set(&object, object_name);
1939
  s = cos_head_object(options, &bucket, &object, NULL, &resp_headers);
1940
  // print_headers(resp_headers);
1941
  if (cos_status_is_ok(s)) {
1942
    char *content_length_str = (char *)apr_table_get(resp_headers, COS_CONTENT_LENGTH);
1943
    if (content_length_str != NULL) {
1944
      size = atol(content_length_str);
1945
    }
1946
    cos_warn_log("head object succeeded: %ld\n", size);
1947
  } else {
1948
    cos_warn_log("head object failed\n");
1949
  }
1950

1951
  // 销毁内存池
1952
  cos_pool_destroy(p);
1953

1954
  return size;
1955
}
1956

1957
#else
1958

1959
int32_t s3Init() { return 0; }
1960
int32_t s3Begin() { TAOS_RETURN(TSDB_CODE_SUCCESS); }
1961

1962
void    s3End() {}
1963
int32_t s3CheckCfg() { return 0; }
1964
int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; }
1965
int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; }
1966
int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) { return 0; }
1967
void    s3DeleteObjectsByPrefix(const char *prefix) {}
1968
int32_t s3DeleteObjects(const char *object_name[], int nobject) { return 0; }
1969
bool    s3Exists(const char *object_name) { return false; }
1970
bool    s3Get(const char *object_name, const char *path) { return false; }
1971
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
1972
  return 0;
1973
}
1974
void    s3EvictCache(const char *path, long object_size) {}
1975
long    s3Size(const char *object_name) { return 0; }
1976
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; }
1977
int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { return 0; }
1978

1979
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc