• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4914

06 Jan 2026 01:30AM UTC coverage: 64.876% (-0.008%) from 64.884%
#4914

push

travis-ci

web-flow
merge: from main to 3.0 branch #34167

180 of 319 new or added lines in 14 files covered. (56.43%)

3475 existing lines in 124 files now uncovered.

194993 of 300563 relevant lines covered (64.88%)

116239151.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.07
/source/util/src/tcompression.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
/* README.md   TAOS compression
17
 *
18
 * INTEGER Compression Algorithm:
19
 *   To compress integers (including char, short, int32_t, int64_t), the difference
20
 *   between two integers is calculated at first. Then the difference is
21
 *   transformed to positive by zig-zag encoding method
22
 *   (https://gist.github.com/mfuerstenau/ba870a29e16536fdbaba). Then the value is
23
 *   encoded using simple 8B method. For more information about simple 8B,
24
 *   refer to https://en.wikipedia.org/wiki/8b/10b_encoding.
25
 *
26
 *   NOTE : For bigint, only 59 bits can be used, which means data from -(2**59) to (2**59)-1
27
 *   are allowed.
28
 *
29
 * BOOLEAN Compression Algorithm:
30
 *   We provide two methods for compress boolean types. Because boolean types in C
31
 *   code are char bytes with 0 and 1 values only, only one bit can used to discriminate
32
 *   the values.
33
 *   1. The first method is using only 1 bit to represent the boolean value with 1 for
34
 *   true and 0 for false. Then the compression rate is 1/8.
35
 *   2. The second method is using run length encoding (RLE) methods. This method works
36
 *   better when there are a lot of consecutive true values or false values.
37
 *
38
 * STRING Compression Algorithm:
39
 *   We us LZ4 method to compress the string type.
40
 *
41
 * FLOAT Compression Algorithm:
42
 *   We use the same method with Akumuli to compress float and double types. The compression
43
 *   algorithm assumes the float/double values change slightly. So we take the XOR between two
44
 *   adjacent values. Then compare the number of leading zeros and trailing zeros. If the number
45
 *   of leading zeros are larger than the trailing zeros, then record the last serveral bytes
46
 *   of the XORed value with informations. If not, record the first corresponding bytes.
47
 *
48
 */
49

50
#define _DEFAULT_SOURCE
51
#include "tcompression.h"
52
#include "lz4.h"
53
#include "tlog.h"
54
#include "ttypes.h"
55
// #include "tmsg.h"
56

57
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
58
#else
59
#include "fast-lzma2.h"
60
#include "zlib.h"
61
#include "zstd.h"
62
#endif
63

64
#include "td_sz.h"
65

66
int32_t tsCompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type);
67
int32_t tsDecompressPlain2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
68
                           const char type);
69
// delta
70
int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type);
71

72
int32_t tsDecompressTimestampImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
73
                                  const char type);
74
// simple8b
75
int32_t tsCompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type);
76
int32_t tsDecompressINTImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
77
                            const char type);
78

79
// bit
80
int32_t tsCompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type);
81
int32_t tsDecompressBoolImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
82
                             char const type);
83

84
// double specail
85

86
int32_t tsCompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type);
87
int32_t tsDecompressDoubleImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
88
                               char const type);
89

90
static int32_t tsEncodeDouble(const char *const input, const int32_t nelements, char *const output, char const type);
91
static int32_t tsDecodeDouble(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
92
                              const char type);
93

94
int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, char *const output);
95
int32_t tsDecompressDoubleImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output);
96
int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, char *const output);
97
int32_t tsDecompressFloatImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output);
98

99
int32_t l2ComressInitImpl_disabled(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
×
100
                                   uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
101
  return 0;
×
102
}
103

104
int32_t l2CompressImpl_disabled(const char *const input, const int32_t inputSize, char *const output,
×
105
                                int32_t outputSize, const char type, int8_t lvl) {
106
  output[0] = 0;
×
107
  memcpy(output + 1, input, inputSize);
×
108
  return inputSize + 1;
×
109
}
110
int32_t l2DecompressImpl_disabled(const char *const input, const int32_t compressedSize, char *const output,
×
111
                                  int32_t outputSize, const char type) {
112
  memcpy(output, input + 1, compressedSize - 1);
×
113
  return compressedSize - 1;
×
114
}
115
int32_t l2ComressInitImpl_lz4(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
×
116
                              uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
117
  return 0;
×
118
}
119

120
int32_t l2CompressImpl_lz4(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
459,292,073✔
121
                           const char type, int8_t lvl) {
122
  const int32_t compressed_data_size = LZ4_compress_default(input, output + 1, inputSize, outputSize - 1);
459,292,073✔
123

124
  // If cannot compress or after compression, data becomes larger.
125
  if (compressed_data_size <= 0 || compressed_data_size > inputSize) {
459,326,502✔
126
    /* First byte is for indicator */
127
    output[0] = 0;
170,495,663✔
128
    memcpy(output + 1, input, inputSize);
170,555,303✔
129
    return inputSize + 1;
170,555,821✔
130
  }
131
  output[0] = 1;
288,830,839✔
132
  return compressed_data_size + 1;
288,827,541✔
133
}
134
int32_t l2DecompressImpl_lz4(const char *const input, const int32_t compressedSize, char *const output,
928,329,560✔
135
                             int32_t outputSize, const char type) {
136
  if (input[0] == 1) {
928,329,560✔
137
    /* It is compressed by LZ4 algorithm */
138
    const int32_t decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize);
373,858,165✔
139
    if (decompressed_size < 0) {
373,872,636✔
140
      uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size);
×
141
      return TSDB_CODE_THIRDPARTY_ERROR;
×
142
    }
143

144
    return decompressed_size;
373,872,636✔
145
  } else if (input[0] == 0) {
554,637,399✔
146
    /* It is not compressed by LZ4 algorithm */
147
    memcpy(output, input + 1, compressedSize - 1);
554,648,564✔
148
    return compressedSize - 1;
554,639,519✔
UNCOV
149
  } else if (input[1] == 2) {
×
150
    uError("Invalid decompress string indicator:%d", input[0]);
×
151
    return TSDB_CODE_THIRDPARTY_ERROR;
×
152
  }
153
  return TSDB_CODE_THIRDPARTY_ERROR;
×
154
}
155
int32_t l2ComressInitImpl_tsz(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
×
156
                              uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
157
  return 0;
×
158
}
159
int32_t l2CompressImpl_tsz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
×
160
                           const char type, int8_t lvl) {
161
  if (type == TSDB_DATA_TYPE_FLOAT) {
×
162
    if (lossyFloat) {
×
163
      return tsCompressFloatLossyImp(input, inputSize, output);
×
164
    }
165
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
×
166
    if (lossyDouble) {
×
167
      return tsCompressDoubleLossyImp(input, inputSize, output);
×
168
    }
169
  }
170

171
  return l2CompressImpl_lz4(input, inputSize, output, outputSize, type, lvl);
×
172
}
173

174
int32_t l2DecompressImpl_tsz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
×
175
                             const char type) {
176
  if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
×
177
    if (HEAD_ALGO(((uint8_t *)input)[0]) == ALGO_SZ_LOSSY) {
×
178
      return tsDecompressFloatLossyImp(input, inputSize, outputSize, output);
×
179
    }
180
  }
181

182
  return l2DecompressImpl_lz4(input, inputSize, output, outputSize, type);
×
183
}
184

185
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
186
// do nothing
187
#else
188

189
int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
×
190
                               uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
191
  return 0;
×
192
}
193

194
int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
52,117,329✔
195
                            const char type, int8_t lvl) {
196
  uLongf  dstLen = outputSize - 1;
52,117,329✔
197
  int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, lvl);
52,120,969✔
198
  if (ret == Z_OK) {
52,122,777✔
199
    output[0] = 1;
52,122,777✔
200
    return dstLen + 1;
52,123,699✔
201
  } else {
202
    output[0] = 0;
×
203
    memcpy(output + 1, input, inputSize);
×
204
    return inputSize + 1;
×
205
  }
206
  return TSDB_CODE_THIRDPARTY_ERROR;
207
}
208
int32_t l2DecompressImpl_zlib(const char *const input, const int32_t compressedSize, char *const output,
117,384,480✔
209
                              int32_t outputSize, const char type) {
210
  if (input[0] == 1) {
117,384,480✔
211
    uLongf len = outputSize;
117,410,245✔
212
    int    ret = uncompress((Bytef *)output, &len, (Bytef *)input + 1, compressedSize - 1);
117,421,219✔
213
    if (ret == Z_OK) {
117,408,242✔
214
      return len;
117,408,242✔
215
    } else {
216
      return TSDB_CODE_THIRDPARTY_ERROR;
×
217
    }
218

219
  } else if (input[0] == 0) {
×
220
    /* It is not compressed by LZ4 algorithm */
221
    memcpy(output, input + 1, compressedSize - 1);
×
222
    return compressedSize - 1;
×
223
  } else if (input[1] == 2) {
×
224
    uError("Invalid decompress string indicator:%d", input[0]);
×
225
    return TSDB_CODE_THIRDPARTY_ERROR;
×
226
  }
227
  return 0;
×
228
}
229
int32_t l2ComressInitImpl_zstd(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
×
230
                               uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
231
  return 0;
×
232
}
233

234
int32_t l2CompressImpl_zstd(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
122,258,388✔
235
                            const char type, int8_t lvl) {
236
  size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, lvl);
122,258,388✔
237
  if (len > inputSize) {
122,271,873✔
238
    output[0] = 0;
45,203,969✔
239
    memcpy(output + 1, input, inputSize);
45,202,815✔
240
    return inputSize + 1;
45,210,912✔
241
  }
242
  output[0] = 1;
77,067,904✔
243

244
  return len + 1;
77,069,401✔
245
}
246
int32_t l2DecompressImpl_zstd(const char *const input, const int32_t compressedSize, char *const output,
272,001,998✔
247
                              int32_t outputSize, const char type) {
248
  if (input[0] == 1) {
272,001,998✔
249
    return ZSTD_decompress(output, outputSize, input + 1, compressedSize - 1);
121,867,682✔
250
  } else if (input[0] == 0) {
150,160,018✔
251
    memcpy(output, input + 1, compressedSize - 1);
150,158,415✔
252
    return compressedSize - 1;
150,155,406✔
253
  }
254
  return TSDB_CODE_THIRDPARTY_ERROR;
×
255
}
256

257
int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
×
258
                             uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
259
  return 0;
×
260
}
261
int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
×
262
                          const char type, int8_t lvl) {
263
  size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl);
×
264
  if (len > inputSize) {
×
265
    output[0] = 0;
×
266
    memcpy(output + 1, input, inputSize);
×
267
    return inputSize + 1;
×
268
  }
269
  output[0] = 1;
×
270
  return len + 1;
×
271
}
272
int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSize, char *const output,
×
273
                            int32_t outputSize, const char type) {
274
  if (input[0] == 1) {
×
275
    return FL2_decompress(output, outputSize, input + 1, compressedSize - 1);
×
276
  } else if (input[0] == 0) {
×
277
    memcpy(output, input + 1, compressedSize - 1);
×
278
    return compressedSize - 1;
×
279
  }
280
  return TSDB_CODE_THIRDPARTY_ERROR;
×
281
}
282
#endif
283

284
TCmprL1FnSet compressL1Dict[] = {
285
    {"PLAIN", NULL, tsCompressPlain2, tsDecompressPlain2},
286
    {"SIMPLE-8B", NULL, tsCompressINTImp2, tsDecompressINTImp2},
287
    {"DELTAI", NULL, tsCompressTimestampImp2, tsDecompressTimestampImp2},
288
    {"BIT-PACKING", NULL, tsCompressBoolImp2, tsDecompressBoolImp2},
289
    {"DELTAD", NULL, tsCompressDoubleImp2, tsDecompressDoubleImp2},
290
    {"BYTE-STREAM_SPLIT", NULL, tsEncodeDouble, tsDecodeDouble},
291
};
292

293
TCmprLvlSet compressL2LevelDict[] = {
294
    {"unknown", .lvl = {1, 2, 3}}, {"lz4", .lvl = {1, 2, 3}}, {"zlib", .lvl = {1, 6, 9}},
295
    {"zstd", .lvl = {1, 11, 22}},  {"tsz", .lvl = {1, 2, 3}}, {"xz", .lvl = {1, 6, 9}},
296
};
297

298
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
299
TCmprL2FnSet compressL2Dict[] = {
300
    {"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled},
301
    {"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
302
    {"zlib", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
303
    {"zstd", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
304
    {"tsz", l2ComressInitImpl_tsz, l2CompressImpl_tsz, l2DecompressImpl_tsz},
305
    {"xz", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}};
306
#else
307
TCmprL2FnSet compressL2Dict[] = {
308
    {"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled},
309
    {"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
310
    {"zlib", l2ComressInitImpl_zlib, l2CompressImpl_zlib, l2DecompressImpl_zlib},
311
    {"zstd", l2ComressInitImpl_zstd, l2CompressImpl_zstd, l2DecompressImpl_zstd},
312
    {"tsz", l2ComressInitImpl_tsz, l2CompressImpl_tsz, l2DecompressImpl_tsz},
313
    {"xz", l2ComressInitImpl_xz, l2CompressImpl_xz, l2DecompressImpl_xz}};
314

315
#endif
316

317
int8_t tsGetCompressL2Level(uint8_t alg, uint8_t lvl) {
584,754,115✔
318
  if (lvl == L2_LVL_LOW) {
584,754,115✔
319
    return compressL2LevelDict[alg].lvl[0];
5,473,364✔
320
  } else if (lvl == L2_LVL_MEDIUM) {
579,280,751✔
321
    return compressL2LevelDict[alg].lvl[1];
578,895,033✔
322
  } else if (lvl == L2_LVL_HIGH) {
385,871✔
323
    return compressL2LevelDict[alg].lvl[2];
421,028✔
324
  }
UNCOV
325
  return 1;
×
326
}
327

328
static const int32_t TEST_NUMBER = 1;
329
#define is_bigendian()     ((*(char *)&TEST_NUMBER) == 0)
330
#define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL)
331

332
#define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a)))
333

334
bool lossyFloat = false;
335
bool lossyDouble = false;
336

337
// init call
338
void tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals,
579,404✔
339
                    int32_t ifAdtFse, const char *compressor) {
340
  // config
341
  lossyFloat = strstr(lossyColumns, "float") != NULL;
579,404✔
342
  lossyDouble = strstr(lossyColumns, "double") != NULL;
579,404✔
343

344
  tdszInit(fPrecision, dPrecision, maxIntervals, intervals, ifAdtFse, compressor);
579,404✔
345
  if (lossyFloat) uTrace("lossy compression float  is opened. ");
579,404✔
346
  if (lossyDouble) uTrace("lossy compression double is opened. ");
579,404✔
347
  return;
579,404✔
348
}
349
// exit call
350
void tsCompressExit() { tdszExit(); }
569,496✔
351

352
/*
353
 * Compress Integer (Simple8B).
354
 */
355
int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) {
377,346,926✔
356
  // Selector value:              0    1   2   3   4   5   6   7   8  9  10  11
357
  // 12  13  14  15
358
  char    bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
377,346,926✔
359
  int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
377,359,432✔
360
  char    bit_to_selector[] = {0,  2,  3,  4,  5,  6,  7,  8,  9,  10, 10, 11, 11, 12, 12, 12, 13, 13, 13, 13, 13,
377,374,107✔
361
                               14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,
362
                               15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15};
363

364
  // get the byte limit.
365
  int32_t word_length = getWordLength(type);
377,376,667✔
366

367
  int32_t byte_limit = nelements * word_length + 1;
377,337,035✔
368
  int32_t opos = 1;
377,337,035✔
369
  int64_t prev_value = 0;
377,337,035✔
370

371
  for (int32_t i = 0; i < nelements;) {
2,147,483,647✔
372
    char    selector = 0;
2,147,483,647✔
373
    char    bit = 0;
2,147,483,647✔
374
    int32_t elems = 0;
2,147,483,647✔
375
    int64_t prev_value_tmp = prev_value;
2,147,483,647✔
376

377
    for (int32_t j = i; j < nelements; j++) {
2,147,483,647✔
378
      // Read data from the input stream and convert it to INT64 type.
379
      int64_t curr_value = 0;
2,147,483,647✔
380
      switch (type) {
2,147,483,647✔
381
        case TSDB_DATA_TYPE_TINYINT:
2,147,483,647✔
382
          curr_value = (int64_t)(*((int8_t *)input + j));
2,147,483,647✔
383
          break;
2,147,483,647✔
384
        case TSDB_DATA_TYPE_SMALLINT:
2,147,483,647✔
385
          curr_value = (int64_t)(*((int16_t *)input + j));
2,147,483,647✔
386
          break;
2,147,483,647✔
387
        case TSDB_DATA_TYPE_INT:
2,147,483,647✔
388
          curr_value = (int64_t)(*((int32_t *)input + j));
2,147,483,647✔
389
          break;
2,147,483,647✔
390
        case TSDB_DATA_TYPE_BIGINT:
2,147,483,647✔
391
          curr_value = (int64_t)(*((int64_t *)input + j));
2,147,483,647✔
392
          break;
2,147,483,647✔
393
      }
394
      // Get difference.
395
      if (!safeInt64Add(curr_value, -prev_value_tmp)) goto _copy_and_exit;
2,147,483,647✔
396

397
      int64_t diff = curr_value - prev_value_tmp;
2,147,483,647✔
398
      // Zigzag encode the value.
399
      uint64_t zigzag_value = ZIGZAG_ENCODE(int64_t, diff);
2,147,483,647✔
400

401
      if (zigzag_value >= SIMPLE8B_MAX_INT64) goto _copy_and_exit;
2,147,483,647✔
402

403
      int64_t tmp_bit;
404
      if (zigzag_value == 0) {
2,147,483,647✔
405
        // Take care here, __builtin_clzl give wrong anser for value 0;
406
        tmp_bit = 0;
2,147,483,647✔
407
      } else {
408
        tmp_bit = (LONG_BYTES * BITS_PER_BYTE) - BUILDIN_CLZL(zigzag_value);
2,147,483,647✔
409
      }
410

411
      if (elems + 1 <= selector_to_elems[(int32_t)selector] &&
2,147,483,647✔
412
          elems + 1 <= selector_to_elems[(int32_t)(bit_to_selector[(int32_t)tmp_bit])]) {
2,147,483,647✔
413
        // If can hold another one.
414
        selector = selector > bit_to_selector[(int32_t)tmp_bit] ? selector : bit_to_selector[(int32_t)tmp_bit];
2,147,483,647✔
415
        elems++;
2,147,483,647✔
416
        bit = bit_per_integer[(int32_t)selector];
2,147,483,647✔
417
      } else {
418
        // if cannot hold another one.
419
        while (elems < selector_to_elems[(int32_t)selector]) selector++;
2,147,483,647✔
420
        elems = selector_to_elems[(int32_t)selector];
2,147,483,647✔
421
        bit = bit_per_integer[(int32_t)selector];
2,147,483,647✔
422
        break;
2,147,483,647✔
423
      }
424
      prev_value_tmp = curr_value;
2,147,483,647✔
425
    }
426

427
    uint64_t buffer = 0;
2,147,483,647✔
428
    buffer |= (uint64_t)selector;
2,147,483,647✔
429
    for (int32_t k = 0; k < elems; k++) {
2,147,483,647✔
430
      int64_t curr_value = 0; /* get current values */
2,147,483,647✔
431
      switch (type) {
2,147,483,647✔
432
        case TSDB_DATA_TYPE_TINYINT:
2,147,483,647✔
433
          curr_value = (int64_t)(*((int8_t *)input + i));
2,147,483,647✔
434
          break;
2,147,483,647✔
435
        case TSDB_DATA_TYPE_SMALLINT:
2,147,483,647✔
436
          curr_value = (int64_t)(*((int16_t *)input + i));
2,147,483,647✔
437
          break;
2,147,483,647✔
438
        case TSDB_DATA_TYPE_INT:
2,147,483,647✔
439
          curr_value = (int64_t)(*((int32_t *)input + i));
2,147,483,647✔
440
          break;
2,147,483,647✔
441
        case TSDB_DATA_TYPE_BIGINT:
2,147,483,647✔
442
          curr_value = (int64_t)(*((int64_t *)input + i));
2,147,483,647✔
443
          break;
2,147,483,647✔
444
      }
445
      int64_t  diff = curr_value - prev_value;
2,147,483,647✔
446
      uint64_t zigzag_value = ZIGZAG_ENCODE(int64_t, diff);
2,147,483,647✔
447
      buffer |= ((zigzag_value & INT64MASK(bit)) << (bit * k + 4));
2,147,483,647✔
448
      i++;
2,147,483,647✔
449
      prev_value = curr_value;
2,147,483,647✔
450
    }
451

452
    // Output the encoded value to the output.
453
    if (opos + sizeof(buffer) <= byte_limit) {
2,147,483,647✔
454
      memcpy(output + opos, &buffer, sizeof(buffer));
2,147,483,647✔
455
      opos += sizeof(buffer);
2,147,483,647✔
456
    } else {
457
    _copy_and_exit:
599,966,186✔
458
      output[0] = 1;
105,354,414✔
459
      memcpy(output + 1, input, byte_limit - 1);
105,352,385✔
460
      return byte_limit;
105,364,466✔
461
    }
462
  }
463

464
  // set the indicator.
465
  output[0] = 0;
272,020,387✔
466
  return opos;
272,020,042✔
467
}
468

469
int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) {
1,872,669,319✔
470
  int32_t word_length = getWordLength(type);
1,872,669,319✔
471
  if (word_length < 0) {
1,872,753,162✔
472
    return -1;
×
473
  }
474

475
  // If not compressed.
476
  if (input[0] == 1) {
1,872,753,162✔
477
    memcpy(output, input + 1, nelements * word_length);
707,595,806✔
478
    return nelements * word_length;
707,625,766✔
479
  }
480

481
  if (tsSIMDEnable && tsAVX512Enable && tsAVX512Supported) {
1,165,211,996✔
482
    int32_t cnt = tsDecompressIntImpl_Hw(input, nelements, output, type);
×
483
    if (cnt >= 0) {
×
484
      return cnt;
×
485
    }
486
  }
487

488
  // Selector value: 0    1   2   3   4   5   6   7   8  9  10  11 12  13  14  15
489
  char    bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
1,165,211,996✔
490
  int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
1,165,256,688✔
491

492
  const char *ip = input + 1;
1,165,292,736✔
493
  char       *op = output;
1,188,933,625✔
494
  int32_t     count = 0;
1,188,933,625✔
495
  int64_t     prev_value = 0;
1,188,933,625✔
496

497
  while (count < nelements) {
2,147,483,647✔
498
    uint64_t w = *(uint64_t *)ip;
2,147,483,647✔
499

500
    char    selector = (char)(w & INT64MASK(4));       // selector = 4
2,147,483,647✔
501
    char    bit = bit_per_integer[(int32_t)selector];  // bit = 3
2,147,483,647✔
502
    int32_t elems = selector_to_elems[(int32_t)selector];
2,147,483,647✔
503

504
    switch (type) {
2,147,483,647✔
505
      case TSDB_DATA_TYPE_BIGINT: {
2,147,483,647✔
506
        int64_t *out = (int64_t *)op;
2,147,483,647✔
507
        if (selector == 0 || selector == 1) {
2,147,483,647✔
508
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
2,147,483,647✔
509
            *out = prev_value;
2,147,483,647✔
510
          }
511
        } else {
512
          uint64_t zigzag_value = 0;
2,147,483,647✔
513
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
2,147,483,647✔
514
            zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
2,147,483,647✔
515
            prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
2,147,483,647✔
516
            *out = prev_value;
2,147,483,647✔
517
          }
518
        }
519
        op = (char *)out;
2,147,483,647✔
520
        break;
2,147,483,647✔
521
      }
522
      case TSDB_DATA_TYPE_INT: {
2,147,483,647✔
523
        int32_t *out = (int32_t *)op;
2,147,483,647✔
524
        if (selector == 0 || selector == 1) {
2,147,483,647✔
525
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
2,147,483,647✔
526
            *out = (int32_t)prev_value;
2,147,483,647✔
527
          }
528
        } else {
529
          uint64_t zigzag_value = 0;
2,147,483,647✔
530
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
2,147,483,647✔
531
            zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
2,147,483,647✔
532
            prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
2,147,483,647✔
533
            *out = (int32_t)prev_value;
2,147,483,647✔
534
          }
535
        }
536
        op = (char *)out;
2,147,483,647✔
537
        break;
2,147,483,647✔
538
      }
539
      case TSDB_DATA_TYPE_SMALLINT: {
646,609,081✔
540
        int16_t *out = (int16_t *)op;
646,609,081✔
541
        if (selector == 0 || selector == 1) {
646,609,081✔
542
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
2,147,483,647✔
543
            *out = (int16_t)prev_value;
2,147,483,647✔
544
          }
545
        } else {
546
          uint64_t zigzag_value = 0;
617,820,058✔
547
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
2,147,483,647✔
548
            zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
2,147,483,647✔
549
            prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
2,147,483,647✔
550
            *out = (int16_t)prev_value;
2,147,483,647✔
551
          }
552
        }
553
        op = (char *)out;
646,620,856✔
554
        break;
646,620,856✔
555
      }
556
      case TSDB_DATA_TYPE_TINYINT: {
278,478,242✔
557
        int8_t *out = (int8_t *)op;
278,478,242✔
558
        if (selector == 0 || selector == 1) {
278,478,242✔
559
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
2,147,483,647✔
560
            *out = (int8_t)prev_value;
2,147,483,647✔
561
          }
562
        } else {
563
          uint64_t zigzag_value = 0;
244,934,900✔
564
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
2,147,483,647✔
565
            zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
2,147,483,647✔
566
            prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
2,147,483,647✔
567
            *out = (int8_t)prev_value;
2,147,483,647✔
568
          }
569
        }
570
        op = (char *)out;
278,480,356✔
571
        break;
278,480,356✔
572
      }
573
      default:
×
574
        perror("Wrong integer types.\n");
×
575
        return -1;
×
576
    }
577
    ip += LONG_BYTES;
2,147,483,647✔
578
  }
579

580
  return nelements * word_length;
1,165,253,862✔
581
}
582

583
/* ----------------------------------------------Bool Compression ---------------------------------------------- */
584
// TODO: You can also implement it using RLE method.
585
int32_t tsCompressBoolImp(const char *const input, const int32_t nelements, char *const output) {
19,438,912✔
586
  int32_t pos = -1;
19,438,912✔
587
  int32_t ele_per_byte = BITS_PER_BYTE / 2;
19,438,912✔
588

589
  for (int32_t i = 0; i < nelements; i++) {
2,147,483,647✔
590
    if (i % ele_per_byte == 0) {
2,147,483,647✔
591
      pos++;
2,147,483,647✔
592
      output[pos] = 0;
2,147,483,647✔
593
    }
594

595
    uint8_t t = 0;
2,147,483,647✔
596
    if (input[i] == 1) {
2,147,483,647✔
597
      t = (((uint8_t)1) << (2 * (i % ele_per_byte)));
2,147,483,647✔
598
      output[pos] |= t;
2,147,483,647✔
599
    } else if (input[i] == 0) {
2,147,483,647✔
600
      t = ((uint8_t)1 << (2 * (i % ele_per_byte))) - 1;
2,147,483,647✔
601
      /* t = (~((( uint8_t)1) << (7-i%BITS_PER_BYTE))); */
602
      output[pos] &= t;
2,147,483,647✔
603
    } else if (input[i] == TSDB_DATA_BOOL_NULL) {
×
604
      t = ((uint8_t)2 << (2 * (i % ele_per_byte)));
×
605
      /* t = (~((( uint8_t)1) << (7-i%BITS_PER_BYTE))); */
606
      output[pos] |= t;
×
607
    } else {
608
      uError("Invalid compress bool value:%d", output[pos]);
×
609
      return TSDB_CODE_INVALID_PARA;
×
610
    }
611
  }
612

613
  return pos + 1;
19,422,394✔
614
}
615

616
int32_t tsDecompressBoolImp(const char *const input, const int32_t nelements, char *const output) {
19,290,002✔
617
  int32_t ipos = -1, opos = 0;
19,290,002✔
618
  int32_t ele_per_byte = BITS_PER_BYTE / 2;
19,290,002✔
619

620
  for (int32_t i = 0; i < nelements; i++) {
2,147,483,647✔
621
    if (i % ele_per_byte == 0) {
2,147,483,647✔
622
      ipos++;
2,147,483,647✔
623
    }
624

625
    uint8_t ele = (input[ipos] >> (2 * (i % ele_per_byte))) & INT8MASK(2);
2,147,483,647✔
626
    if (ele == 1) {
2,147,483,647✔
627
      output[opos++] = 1;
2,147,483,647✔
628
    } else if (ele == 2) {
2,147,483,647✔
629
      output[opos++] = TSDB_DATA_BOOL_NULL;
×
630
    } else {
631
      output[opos++] = 0;
2,147,483,647✔
632
    }
633
  }
634

635
  return nelements;
19,253,055✔
636
}
637
int32_t tsCompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type) {
19,437,244✔
638
  return tsCompressBoolImp(input, nelements, output);
19,437,244✔
639
}
640
int32_t tsDecompressBoolImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
19,290,452✔
641
                             char const type) {
642
  return tsDecompressBoolImp(input, nelements, output);
19,290,452✔
643
}
644

645
int32_t tsCompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type) {
479,423✔
646
  if (type == TSDB_DATA_TYPE_FLOAT) {
479,423✔
647
    return tsCompressFloatImp(input, nelements, output);
475,215✔
648
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
4,208✔
649
    return tsCompressDoubleImp(input, nelements, output);
4,208✔
650
  }
651
  return TSDB_CODE_THIRDPARTY_ERROR;
×
652
}
653
int32_t tsDecompressDoubleImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
1,337,681✔
654
                               char const type) {
655
  if (type == TSDB_DATA_TYPE_FLOAT) {
1,337,681✔
656
    return tsDecompressFloatImp(input, ninput, nelements, output);
1,337,681✔
657
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
×
658
    return tsDecompressDoubleImp(input, ninput, nelements, output);
×
659
  }
660
  return TSDB_CODE_THIRDPARTY_ERROR;
×
661
}
662
int32_t tsCompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type) {
272,010,426✔
663
  return tsCompressINTImp(input, nelements, output, type);
272,010,426✔
664
}
665
int32_t tsDecompressINTImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
456,124,678✔
666
                            const char type) {
667
  return tsDecompressINTImp(input, nelements, output, type);
456,124,678✔
668
}
669

670
static int32_t tsEncodeDoubleImpl(const char *const input, const int32_t inputSize, char *const output,
108,811,334✔
671
                                  const int32_t outputSize, uint8_t bytes) {
672
  if (NULL == input ||           //
108,811,334✔
673
      NULL == output ||          //
108,863,463✔
674
      inputSize <= 0 ||          //
108,872,079✔
675
      outputSize < inputSize ||  //
108,867,430✔
676
      inputSize % bytes != 0     //
108,867,430✔
677
  ) {
678
    return TSDB_CODE_INVALID_PARA;
151✔
679
  }
680

681
  int32_t pos = 0;
108,891,225✔
682
  int32_t numEles = inputSize / bytes;
108,891,225✔
683

684
  for (int32_t i = 0; i < bytes; i++) {
741,889,638✔
685
    for (int32_t j = 0; j < numEles; j++) {
2,147,483,647✔
686
      output[pos++] = input[i + j * bytes];
2,147,483,647✔
687
    }
688
  }
689

690
  return pos;
108,866,179✔
691
}
692

693
static int32_t tsDecodeDoubleImpl(const char *const input, const int32_t inputSize, char *const output,
105,891,321✔
694
                                  const int32_t outputSize, uint8_t bytes) {
695
  if (NULL == input ||           //
105,891,321✔
696
      NULL == output ||          //
105,923,933✔
697
      inputSize <= 0 ||          //
105,930,743✔
698
      outputSize < inputSize ||  //
105,921,849✔
699
      inputSize % bytes != 0     //
105,921,849✔
700
  ) {
701
    return TSDB_CODE_INVALID_PARA;
×
702
  }
703

704
  int32_t pos = 0;
105,934,138✔
705
  int32_t numEles = inputSize / bytes;
105,934,138✔
706

707
  for (int32_t i = 0; i < numEles; i++) {
2,147,483,647✔
708
    for (int32_t j = 0; j < bytes; j++) {
2,147,483,647✔
709
      output[pos++] = input[i + j * numEles];
2,147,483,647✔
710
    }
711
  }
712

713
  return pos;
97,211,684✔
714
}
715

716
static int32_t tsEncodeDouble(const char *const input, const int32_t nelements, char *const output, char const type) {
108,809,269✔
717
  if (TSDB_DATA_TYPE_FLOAT == type) {
108,809,269✔
718
    return tsEncodeDoubleImpl(input, nelements * sizeof(float), output, nelements * sizeof(float) + 1, sizeof(float));
59,309,075✔
719
  } else if (TSDB_DATA_TYPE_DOUBLE == type) {
49,500,194✔
720
    return tsEncodeDoubleImpl(input, nelements * sizeof(double), output, nelements * sizeof(double) + 1,
49,539,290✔
721
                              sizeof(double));
722
  }
723
  return TSDB_CODE_THIRDPARTY_ERROR;
×
724
}
725

726
static int32_t tsDecodeDouble(const char *const input, int32_t inputSize, const int32_t nelements, char *const output,
105,892,578✔
727
                              const char type) {
728
  if (TSDB_DATA_TYPE_FLOAT == type) {
105,892,578✔
729
    return tsDecodeDoubleImpl(input, inputSize, output, nelements * sizeof(float), sizeof(float));
67,241,279✔
730
  } else if (TSDB_DATA_TYPE_DOUBLE == type) {
38,651,299✔
731
    return tsDecodeDoubleImpl(input, inputSize, output, nelements * sizeof(double), sizeof(double));
38,662,501✔
732
  }
733
  return TSDB_CODE_THIRDPARTY_ERROR;
×
734
}
735

736
#if 0
737
/* Run Length Encoding(RLE) Method */
738
int32_t tsCompressBoolRLEImp(const char *const input, const int32_t nelements, char *const output) {
739
  int32_t _pos = 0;
740

741
  for (int32_t i = 0; i < nelements;) {
742
    unsigned char counter = 1;
743
    char          num = input[i];
744

745
    for (++i; i < nelements; i++) {
746
      if (input[i] == num) {
747
        counter++;
748
        if (counter == INT8MASK(7)) {
749
          i++;
750
          break;
751
        }
752
      } else {
753
        break;
754
      }
755
    }
756

757
    // Encode the data.
758
    if (num == 1) {
759
      output[_pos++] = INT8MASK(1) | (counter << 1);
760
    } else if (num == 0) {
761
      output[_pos++] = (counter << 1) | INT8MASK(0);
762
    } else {
763
      uError("Invalid compress bool value:%d", output[_pos]);
764
      return -1;
765
    }
766
  }
767

768
  return _pos;
769
}
770

771
int32_t tsDecompressBoolRLEImp(const char *const input, const int32_t nelements, char *const output) {
772
  int32_t ipos = 0, opos = 0;
773
  while (1) {
774
    char     encode = input[ipos++];
775
    unsigned counter = (encode >> 1) & INT8MASK(7);
776
    char     value = encode & INT8MASK(1);
777

778
    memset(output + opos, value, counter);
779
    opos += counter;
780
    if (opos >= nelements) {
781
      return nelements;
782
    }
783
  }
784
}
785
#endif
786

787
/* ----------------------------------------------String Compression ---------------------------------------------- */
788
// Note: the size of the output must be larger than input_size + 1 and
789
// LZ4_compressBound(size) + 1;
790
// >= max(input_size, LZ4_compressBound(input_size)) + 1;
791
int32_t tsCompressStringImp(const char *const input, int32_t inputSize, char *const output, int32_t outputSize) {
105,554,262✔
792
  // Try to compress using LZ4 algorithm.
793
  const int32_t compressed_data_size = LZ4_compress_default(input, output + 1, inputSize, outputSize - 1);
105,554,262✔
794

795
  // If cannot compress or after compression, data becomes larger.
796
  if (compressed_data_size <= 0 || compressed_data_size > inputSize) {
105,567,559✔
797
    /* First byte is for indicator */
798
    output[0] = 0;
91,902,254✔
799
    memcpy(output + 1, input, inputSize);
91,901,986✔
800
    return inputSize + 1;
91,904,057✔
801
  }
802

803
  output[0] = 1;
13,665,305✔
804
  return compressed_data_size + 1;
13,665,305✔
805
}
806

807
int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, char *const output, int32_t outputSize) {
1,444,910,890✔
808
  // compressedSize is the size of data after compression.
809

810
  if (input[0] == 1) {
1,444,910,890✔
811
    /* It is compressed by LZ4 algorithm */
812
    const int32_t decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize);
194,854,621✔
813
    if (decompressed_size < 0) {
194,832,417✔
814
      uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size);
31✔
815
      return TSDB_CODE_THIRDPARTY_ERROR;
×
816
    }
817

818
    return decompressed_size;
194,832,386✔
819
  } else if (input[0] == 0) {
1,250,104,918✔
820
    /* It is not compressed by LZ4 algorithm */
821
    memcpy(output, input + 1, compressedSize - 1);
1,250,113,388✔
822
    return compressedSize - 1;
1,250,089,730✔
823
  } else if (input[1] == 2) {
30✔
824
    uError("Invalid decompress string indicator:%d", input[0]);
×
825
    return TSDB_CODE_THIRDPARTY_ERROR;
×
826
  }
827
  return TSDB_CODE_THIRDPARTY_ERROR;
×
828
}
829

830
/* --------------------------------------------Timestamp Compression ---------------------------------------------- */
831
// TODO: Take care here, we assumes little endian encoding.
832
//
833
int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
183,592,279✔
834
  int32_t _pos = 1;
183,592,279✔
835
  int32_t longBytes = LONG_BYTES;
183,592,279✔
836

837
  if (nelements < 0) {
183,592,279✔
838
    return -1;
×
839
  }
840

841
  if (nelements == 0) return 0;
183,592,279✔
842

843
  int64_t *istream = (int64_t *)input;
183,592,279✔
844

845
  int64_t prev_value = istream[0];
183,592,279✔
846
  if (prev_value >= 0x8000000000000000) {
183,627,778✔
847
    uWarn("compression timestamp is over signed long long range. ts = 0x%" PRIx64 " \n", prev_value);
28,012✔
848
    goto _exit_over;
28,012✔
849
  }
850
  int64_t  prev_delta = -prev_value;
183,599,766✔
851
  uint8_t  flags = 0, flag1 = 0, flag2 = 0;
183,599,766✔
852
  uint64_t dd1 = 0, dd2 = 0;
183,601,560✔
853

854
  for (int32_t i = 0; i < nelements; i++) {
2,147,483,647✔
855
    int64_t curr_value = istream[i];
2,147,483,647✔
856
    if (!safeInt64Add(curr_value, -prev_value)) goto _exit_over;
2,147,483,647✔
857
    int64_t curr_delta = curr_value - prev_value;
2,147,483,647✔
858
    if (!safeInt64Add(curr_delta, -prev_delta)) goto _exit_over;
2,147,483,647✔
859
    int64_t delta_of_delta = curr_delta - prev_delta;
2,147,483,647✔
860
    // zigzag encode the value.
861
    uint64_t zigzag_value = ZIGZAG_ENCODE(int64_t, delta_of_delta);
2,147,483,647✔
862
    if (i % 2 == 0) {
2,147,483,647✔
863
      flags = 0;
2,147,483,647✔
864
      dd1 = zigzag_value;
2,147,483,647✔
865
      if (dd1 == 0) {
2,147,483,647✔
866
        flag1 = 0;
2,147,483,647✔
867
      } else {
868
        flag1 = (uint8_t)(LONG_BYTES - BUILDIN_CLZL(dd1) / BITS_PER_BYTE);
2,147,483,647✔
869
      }
870
    } else {
871
      dd2 = zigzag_value;
2,147,483,647✔
872
      if (dd2 == 0) {
2,147,483,647✔
873
        flag2 = 0;
2,147,483,647✔
874
      } else {
875
        flag2 = (uint8_t)(LONG_BYTES - BUILDIN_CLZL(dd2) / BITS_PER_BYTE);
2,147,483,647✔
876
      }
877
      flags = flag1 | (flag2 << 4);
2,147,483,647✔
878
      // Encode the flag.
879
      if ((_pos + CHAR_BYTES - 1) >= nelements * longBytes) goto _exit_over;
2,147,483,647✔
880
      memcpy(output + _pos, &flags, CHAR_BYTES);
2,147,483,647✔
881
      _pos += CHAR_BYTES;
2,147,483,647✔
882
      /* Here, we assume it is little endian encoding method. */
883
      // Encode dd1
884
      if (is_bigendian()) {
2,147,483,647✔
885
        if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
×
886
        memcpy(output + _pos, (char *)(&dd1) + longBytes - flag1, flag1);
×
887
      } else {
888
        if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
2,147,483,647✔
889
        memcpy(output + _pos, (char *)(&dd1), flag1);
2,147,483,647✔
890
      }
891
      _pos += flag1;
2,147,483,647✔
892
      // Encode dd2;
893
      if (is_bigendian()) {
2,147,483,647✔
894
        if ((_pos + flag2 - 1) >= nelements * longBytes) goto _exit_over;
×
895
        memcpy(output + _pos, (char *)(&dd2) + longBytes - flag2, flag2);
×
896
      } else {
897
        if ((_pos + flag2 - 1) >= nelements * longBytes) goto _exit_over;
2,147,483,647✔
898
        memcpy(output + _pos, (char *)(&dd2), flag2);
2,147,483,647✔
899
      }
900
      _pos += flag2;
2,147,483,647✔
901
    }
902
    prev_value = curr_value;
2,147,483,647✔
903
    prev_delta = curr_delta;
2,147,483,647✔
904
  }
905

906
  if (nelements % 2 == 1) {
183,665,005✔
907
    flag2 = 0;
10,869,688✔
908
    flags = flag1 | (flag2 << 4);
10,869,688✔
909
    // Encode the flag.
910
    if ((_pos + CHAR_BYTES - 1) >= nelements * longBytes) goto _exit_over;
10,869,688✔
911
    memcpy(output + _pos, &flags, CHAR_BYTES);
10,869,688✔
912
    _pos += CHAR_BYTES;
10,869,486✔
913
    // Encode dd1;
914
    if (is_bigendian()) {
10,869,486✔
915
      if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
×
916
      memcpy(output + _pos, (char *)(&dd1) + longBytes - flag1, flag1);
×
917
    } else {
918
      if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
10,867,508✔
919
      memcpy(output + _pos, (char *)(&dd1), flag1);
10,012,019✔
920
    }
921
    _pos += flag1;
10,014,614✔
922
  }
923

924
  output[0] = 1;  // Means the string is compressed
182,809,931✔
925
  return _pos;
182,751,644✔
926

927
_exit_over:
883,501✔
928
  output[0] = 0;  // Means the string is not compressed
883,995✔
929
  memcpy(output + 1, input, nelements * longBytes);
883,995✔
930
  return nelements * longBytes + 1;
883,995✔
931
}
932

933
int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
618,640,504✔
934
  int64_t longBytes = LONG_BYTES;
618,640,504✔
935

936
  if (nelements < 0) return -1;
618,640,504✔
937
  if (nelements == 0) return 0;
618,640,504✔
938

939
  if (input[0] == 0) {
618,640,504✔
940
    memcpy(output, input + 1, nelements * longBytes);
70,704,332✔
941
    return nelements * longBytes;
70,674,269✔
942
  } else if (input[0] == 1) {  // Decompress
548,002,887✔
943
    if (tsSIMDEnable && tsAVX512Enable && tsAVX512Supported) {
547,952,396✔
944
      int32_t cnt = tsDecompressTimestampAvx512(input, nelements, output, false);
×
945
      if (cnt >= 0) {
×
946
        return cnt;
×
947
      }
948
    }
949

950
    int64_t *ostream = (int64_t *)output;
547,952,396✔
951

952
    int32_t ipos = 1, opos = 0;
547,952,396✔
953
    int8_t  nbytes = 0;
547,952,396✔
954
    int64_t prev_value = 0;
547,952,396✔
955
    int64_t prev_delta = 0;
547,952,396✔
956
    int64_t delta_of_delta = 0;
547,952,396✔
957

958
    while (1) {
2,147,483,647✔
959
      uint8_t flags = input[ipos++];
2,147,483,647✔
960
      // Decode dd1
961
      uint64_t dd1 = 0;
2,147,483,647✔
962
      nbytes = flags & INT8MASK(4);
2,147,483,647✔
963
      if (nbytes == 0) {
2,147,483,647✔
964
        delta_of_delta = 0;
2,147,483,647✔
965
      } else {
966
        if (is_bigendian()) {
2,147,483,647✔
967
          memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes);
×
968
        } else {
969
          memcpy(&dd1, input + ipos, nbytes);
2,147,483,647✔
970
        }
971
        delta_of_delta = ZIGZAG_DECODE(int64_t, dd1);
2,147,483,647✔
972
      }
973

974
      ipos += nbytes;
2,147,483,647✔
975
      if (opos == 0) {
2,147,483,647✔
976
        prev_value = delta_of_delta;
548,052,975✔
977
        prev_delta = 0;
548,052,975✔
978
        ostream[opos++] = delta_of_delta;
548,052,975✔
979
      } else {
980
        prev_delta = delta_of_delta + prev_delta;
2,147,483,647✔
981
        prev_value = prev_value + prev_delta;
2,147,483,647✔
982
        ostream[opos++] = prev_value;
2,147,483,647✔
983
      }
984
      if (opos == nelements) return nelements * longBytes;
2,147,483,647✔
985

986
      // Decode dd2
987
      uint64_t dd2 = 0;
2,147,483,647✔
988
      nbytes = (flags >> 4) & INT8MASK(4);
2,147,483,647✔
989
      if (nbytes == 0) {
2,147,483,647✔
990
        delta_of_delta = 0;
2,147,483,647✔
991
      } else {
992
        if (is_bigendian()) {
2,147,483,647✔
993
          memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes);
×
994
        } else {
995
          memcpy(&dd2, input + ipos, nbytes);
2,147,483,647✔
996
        }
997
        // zigzag_decoding
998
        delta_of_delta = ZIGZAG_DECODE(int64_t, dd2);
2,147,483,647✔
999
      }
1000
      ipos += nbytes;
2,147,483,647✔
1001
      prev_delta = delta_of_delta + prev_delta;
2,147,483,647✔
1002
      prev_value = prev_value + prev_delta;
2,147,483,647✔
1003
      ostream[opos++] = prev_value;
2,147,483,647✔
1004
      if (opos == nelements) return nelements * longBytes;
2,147,483,647✔
1005
    }
1006
  }
1007

1008
  return nelements * longBytes;
49✔
1009
}
1010

1011
int32_t tsCompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type) {
×
1012
  int32_t bytes = tDataTypes[type].bytes * nelements;
×
1013
  output[0] = 0;
×
1014
  memcpy(output + 1, input, bytes);
×
1015
  return bytes + 1;
×
1016
}
1017
int32_t tsDecompressPlain2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
×
1018
                           const char type) {
1019
  int32_t bytes = tDataTypes[type].bytes * nelements;
×
1020
  memcpy(output, input + 1, bytes);
×
1021
  return bytes;
×
1022
}
1023
int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type) {
183,560,433✔
1024
  return tsCompressTimestampImp(input, nelements, output);
183,560,433✔
1025
}
1026
int32_t tsDecompressTimestampImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
618,656,488✔
1027
                                  const char type) {
1028
  return tsDecompressTimestampImp(input, nelements, output);
618,656,488✔
1029
}
1030

1031
/* --------------------------------------------Double Compression ---------------------------------------------- */
1032
void encodeDoubleValue(uint64_t diff, uint8_t flag, char *const output, int32_t *const pos) {
5,255,792✔
1033
  int32_t longBytes = LONG_BYTES;
5,255,792✔
1034

1035
  uint8_t nbytes = (flag & INT8MASK(3)) + 1;
5,255,792✔
1036
  int32_t nshift = (longBytes * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
5,255,792✔
1037
  diff >>= nshift;
5,255,792✔
1038

1039
  while (nbytes) {
18,241,154✔
1040
    output[(*pos)++] = (int8_t)(diff & INT64MASK(8));
12,985,362✔
1041
    diff >>= BITS_PER_BYTE;
12,985,362✔
1042
    nbytes--;
12,985,362✔
1043
  }
1044
}
5,255,792✔
1045

1046
int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, char *const output) {
4,208✔
1047
  int32_t byte_limit = nelements * DOUBLE_BYTES + 1;
4,208✔
1048
  int32_t opos = 1;
4,208✔
1049

1050
  uint64_t prev_value = 0;
4,208✔
1051
  uint64_t prev_diff = 0;
4,208✔
1052
  uint8_t  prev_flag = 0;
4,208✔
1053

1054
  double *istream = (double *)input;
4,208✔
1055

1056
  // Main loop
1057
  for (int32_t i = 0; i < nelements; i++) {
5,258,948✔
1058
    union {
1059
      double   real;
1060
      uint64_t bits;
1061
    } curr;
1062

1063
    curr.real = istream[i];
5,254,740✔
1064

1065
    // Here we assume the next value is the same as previous one.
1066
    uint64_t predicted = prev_value;
5,254,740✔
1067
    uint64_t diff = curr.bits ^ predicted;
5,254,740✔
1068

1069
    int32_t leading_zeros = LONG_BYTES * BITS_PER_BYTE;
5,254,740✔
1070
    int32_t trailing_zeros = leading_zeros;
5,254,740✔
1071

1072
    if (diff) {
5,254,740✔
1073
      trailing_zeros = BUILDIN_CTZL(diff);
4,730,318✔
1074
      leading_zeros = BUILDIN_CLZL(diff);
4,730,318✔
1075
    }
1076

1077
    uint8_t nbytes = 0;
5,254,740✔
1078
    uint8_t flag;
1079

1080
    if (trailing_zeros > leading_zeros) {
5,254,740✔
1081
      nbytes = (uint8_t)(LONG_BYTES - trailing_zeros / BITS_PER_BYTE);
4,730,318✔
1082

1083
      if (nbytes > 0) nbytes--;
4,730,318✔
1084
      flag = ((uint8_t)1 << 3) | nbytes;
4,730,318✔
1085
    } else {
1086
      nbytes = (uint8_t)(LONG_BYTES - leading_zeros / BITS_PER_BYTE);
524,422✔
1087
      if (nbytes > 0) nbytes--;
524,422✔
1088
      flag = nbytes;
524,422✔
1089
    }
1090

1091
    if (i % 2 == 0) {
5,254,740✔
1092
      prev_diff = diff;
2,627,896✔
1093
      prev_flag = flag;
2,627,896✔
1094
    } else {
1095
      int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
2,626,844✔
1096
      int32_t nbyte2 = (flag & INT8MASK(3)) + 1;
2,626,844✔
1097
      if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
2,626,844✔
1098
        uint8_t flags = prev_flag | (flag << 4);
2,626,844✔
1099
        output[opos++] = flags;
2,626,844✔
1100
        encodeDoubleValue(prev_diff, prev_flag, output, &opos);
2,626,844✔
1101
        encodeDoubleValue(diff, flag, output, &opos);
2,626,844✔
1102
      } else {
1103
        output[0] = 1;
×
1104
        memcpy(output + 1, input, byte_limit - 1);
×
1105
        return byte_limit;
×
1106
      }
1107
    }
1108
    prev_value = curr.bits;
5,254,740✔
1109
  }
1110

1111
  if (nelements % 2) {
4,208✔
1112
    int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
1,052✔
1113
    int32_t nbyte2 = 1;
1,052✔
1114
    if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
1,052✔
1115
      uint8_t flags = prev_flag;
1,052✔
1116
      output[opos++] = flags;
1,052✔
1117
      encodeDoubleValue(prev_diff, prev_flag, output, &opos);
1,052✔
1118
      encodeDoubleValue(0ul, 0, output, &opos);
1,052✔
1119
    } else {
1120
      output[0] = 1;
×
1121
      memcpy(output + 1, input, byte_limit - 1);
×
1122
      return byte_limit;
×
1123
    }
1124
  }
1125

1126
  output[0] = 0;
4,208✔
1127
  return opos;
4,208✔
1128
}
1129

1130
FORCE_INLINE uint64_t decodeDoubleValue(const char *const input, int32_t *const ipos, uint8_t flag) {
1131
  int32_t longBytes = LONG_BYTES;
×
1132

1133
  uint64_t diff = 0ul;
×
1134
  int32_t  nbytes = (flag & 0x7) + 1;
×
1135
  for (int32_t i = 0; i < nbytes; i++) {
×
1136
    diff |= (((uint64_t)0xff & input[(*ipos)++]) << BITS_PER_BYTE * i);
×
1137
  }
1138
  int32_t shift_width = (longBytes * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
×
1139
  diff <<= shift_width;
×
1140

1141
  return diff;
×
1142
}
1143

1144
static int32_t tsDecompressDoubleImpHelper(const char *input, int32_t nelements, char *output) {
×
1145
  double  *ostream = (double *)output;
×
1146
  uint8_t  flags = 0;
×
1147
  int32_t  ipos = 0;
×
1148
  int32_t  opos = 0;
×
1149
  uint64_t diff = 0;
×
1150
  union {
1151
    uint64_t bits;
1152
    double   real;
1153
  } curr;
1154

1155
  curr.bits = 0;
×
1156

1157
  for (int32_t i = 0; i < nelements; i++) {
×
1158
    if ((i & 0x01) == 0) {
×
1159
      flags = input[ipos++];
×
1160
    }
1161

1162
    diff = decodeDoubleValue(input, &ipos, flags & INT8MASK(4));
×
1163
    flags >>= 4;
×
1164
    curr.bits ^= diff;
×
1165

1166
    ostream[opos++] = curr.real;
×
1167
  }
1168

1169
  return nelements * DOUBLE_BYTES;
×
1170
}
1171

1172
int32_t tsDecompressDoubleImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output) {
×
1173
  // return the result directly if there is no compression
1174
  if (input[0] == 1) {
×
1175
    memcpy(output, input + 1, nelements * DOUBLE_BYTES);
×
1176
    return nelements * DOUBLE_BYTES;
×
1177
  }
1178

1179
  // use AVX2 implementation when allowed and the compression ratio is not high
1180
  double compressRatio = 1.0 * nelements * DOUBLE_BYTES / ninput;
×
1181
  if (tsSIMDEnable && tsAVX2Supported && compressRatio < 2) {
×
1182
    int32_t cnt = tsDecompressDoubleImpAvx2(input + 1, nelements, output);
×
1183
    if (cnt >= 0) {
×
1184
      return cnt;
×
1185
    }
1186
  }
1187

1188
  // use implementation without SIMD instructions by default
1189
  return tsDecompressDoubleImpHelper(input + 1, nelements, output);
×
1190
}
1191

1192
/* --------------------------------------------Float Compression ---------------------------------------------- */
1193
void encodeFloatValue(uint32_t diff, uint8_t flag, char *const output, int32_t *const pos) {
1,779,698,942✔
1194
  uint8_t nbytes = (flag & INT8MASK(3)) + 1;
1,779,698,942✔
1195
  int32_t nshift = (FLOAT_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
1,779,698,942✔
1196
  diff >>= nshift;
1,779,698,942✔
1197

1198
  while (nbytes) {
2,147,483,647✔
1199
    output[(*pos)++] = (int8_t)(diff & INT32MASK(8));
2,147,483,647✔
1200
    diff >>= BITS_PER_BYTE;
2,147,483,647✔
1201
    nbytes--;
2,147,483,647✔
1202
  }
1203
}
1,779,698,942✔
1204

1205
int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, char *const output) {
475,215✔
1206
  float  *istream = (float *)input;
475,215✔
1207
  int32_t byte_limit = nelements * FLOAT_BYTES + 1;
475,215✔
1208
  int32_t opos = 1;
475,215✔
1209

1210
  uint32_t prev_value = 0;
475,215✔
1211
  uint32_t prev_diff = 0;
475,215✔
1212
  uint8_t  prev_flag = 0;
475,215✔
1213

1214
  // Main loop
1215
  for (int32_t i = 0; i < nelements; i++) {
1,780,191,907✔
1216
    union {
1217
      float    real;
1218
      uint32_t bits;
1219
    } curr;
1220

1221
    curr.real = istream[i];
1,779,744,676✔
1222

1223
    // Here we assume the next value is the same as previous one.
1224
    uint32_t predicted = prev_value;
1,779,744,676✔
1225
    uint32_t diff = curr.bits ^ predicted;
1,779,744,676✔
1226

1227
    int32_t clz = FLOAT_BYTES * BITS_PER_BYTE;
1,779,744,676✔
1228
    int32_t ctz = clz;
1,779,744,676✔
1229

1230
    if (diff) {
1,779,744,676✔
1231
      ctz = BUILDIN_CTZ(diff);
1,755,785,121✔
1232
      clz = BUILDIN_CLZ(diff);
1,755,785,121✔
1233
    }
1234

1235
    uint8_t nbytes = 0;
1,779,744,676✔
1236
    uint8_t flag;
1237

1238
    if (ctz > clz) {
1,779,744,676✔
1239
      nbytes = (uint8_t)(FLOAT_BYTES - ctz / BITS_PER_BYTE);
6,632,422✔
1240

1241
      if (nbytes > 0) nbytes--;
6,632,422✔
1242
      flag = ((uint8_t)1 << 3) | nbytes;
6,632,422✔
1243
    } else {
1244
      nbytes = (uint8_t)(FLOAT_BYTES - clz / BITS_PER_BYTE);
1,773,112,254✔
1245
      if (nbytes > 0) nbytes--;
1,773,112,254✔
1246
      flag = nbytes;
1,773,112,254✔
1247
    }
1248

1249
    if (i % 2 == 0) {
1,779,744,676✔
1250
      prev_diff = diff;
889,877,455✔
1251
      prev_flag = flag;
889,877,455✔
1252
    } else {
1253
      int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
889,867,221✔
1254
      int32_t nbyte2 = (flag & INT8MASK(3)) + 1;
889,867,221✔
1255
      if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
889,867,221✔
1256
        uint8_t flags = prev_flag | (flag << 4);
889,839,237✔
1257
        output[opos++] = flags;
889,839,237✔
1258
        encodeFloatValue(prev_diff, prev_flag, output, &opos);
889,839,237✔
1259
        encodeFloatValue(diff, flag, output, &opos);
889,839,237✔
1260
      } else {
1261
        output[0] = 1;
27,984✔
1262
        memcpy(output + 1, input, byte_limit - 1);
27,984✔
1263
        return byte_limit;
27,984✔
1264
      }
1265
    }
1266
    prev_value = curr.bits;
1,779,716,692✔
1267
  }
1268

1269
  if (nelements % 2) {
447,231✔
1270
    int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
10,234✔
1271
    int32_t nbyte2 = 1;
10,234✔
1272
    if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
10,234✔
1273
      uint8_t flags = prev_flag;
10,234✔
1274
      output[opos++] = flags;
10,234✔
1275
      encodeFloatValue(prev_diff, prev_flag, output, &opos);
10,234✔
1276
      encodeFloatValue(0, 0, output, &opos);
10,234✔
1277
    } else {
1278
      output[0] = 1;
×
1279
      memcpy(output + 1, input, byte_limit - 1);
×
1280
      return byte_limit;
×
1281
    }
1282
  }
1283

1284
  output[0] = 0;
447,231✔
1285
  return opos;
447,231✔
1286
}
1287

1288
uint32_t decodeFloatValue(const char *const input, int32_t *const ipos, uint8_t flag) {
2,147,483,647✔
1289
  uint32_t diff = 0ul;
2,147,483,647✔
1290
  int32_t  nbytes = (flag & INT8MASK(3)) + 1;
2,147,483,647✔
1291
  for (int32_t i = 0; i < nbytes; i++) {
2,147,483,647✔
1292
    diff = diff | ((INT32MASK(8) & input[(*ipos)++]) << BITS_PER_BYTE * i);
2,147,483,647✔
1293
  }
1294
  int32_t shift_width = (FLOAT_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
2,147,483,647✔
1295
  diff <<= shift_width;
2,147,483,647✔
1296

1297
  return diff;
2,147,483,647✔
1298
}
1299

1300
static int32_t tsDecompressFloatImpHelper(const char *input, int32_t nelements, char *output) {
781,385✔
1301
  float   *ostream = (float *)output;
781,385✔
1302
  uint8_t  flags = 0;
781,385✔
1303
  int32_t  ipos = 0;
781,385✔
1304
  int32_t  opos = 0;
781,385✔
1305
  uint32_t diff = 0;
781,385✔
1306
  union {
1307
    uint32_t bits;
1308
    float    real;
1309
  } curr;
1310

1311
  curr.bits = 0;
781,385✔
1312

1313
  for (int32_t i = 0; i < nelements; i++) {
2,147,483,647✔
1314
    if (i % 2 == 0) {
2,147,483,647✔
1315
      flags = input[ipos++];
1,566,605,328✔
1316
    }
1317

1318
    diff = decodeFloatValue(input, &ipos, flags & INT8MASK(4));
2,147,483,647✔
1319
    flags >>= 4;
2,147,483,647✔
1320
    curr.bits ^= diff;
2,147,483,647✔
1321

1322
    ostream[opos++] = curr.real;
2,147,483,647✔
1323
  }
1324

1325
  return nelements * FLOAT_BYTES;
781,385✔
1326
}
1327

1328
int32_t tsDecompressFloatImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output) {
1,337,681✔
1329
  if (input[0] == 1) {
1,337,681✔
1330
    memcpy(output, input + 1, nelements * FLOAT_BYTES);
29,240✔
1331
    return nelements * FLOAT_BYTES;
29,240✔
1332
  }
1333

1334
  // use AVX2 implementation when allowed and the compression ratio is not high
1335
  double compressRatio = 1.0 * nelements * FLOAT_BYTES / ninput;
1,308,441✔
1336
  if (tsSIMDEnable && tsAVX2Supported && compressRatio < 2) {
1,308,441✔
1337
    int32_t cnt = tsDecompressFloatImpAvx2(input + 1, nelements, output);
527,056✔
1338
    if (cnt >= 0) {
527,056✔
1339
      return cnt;
527,056✔
1340
    }
1341
  }
1342

1343
  // use implementation without SIMD instructions by default
1344
  return tsDecompressFloatImpHelper(input + 1, nelements, output);
781,385✔
1345
}
1346

1347
//
1348
//   ----------  float double lossy  -----------
1349
//
1350
int32_t tsCompressFloatLossyImp(const char *input, const int32_t nelements, char *const output) {
×
1351
  // compress with sz
1352
  int32_t       compressedSize = tdszCompress(SZ_FLOAT, input, nelements, output + 1);
×
1353
  unsigned char algo = ALGO_SZ_LOSSY << 1;
×
1354
  if (compressedSize == 0 || compressedSize >= nelements * sizeof(float)) {
×
1355
    // compressed error or large than original
1356
    output[0] = MODE_NOCOMPRESS | algo;
×
1357
    memcpy(output + 1, input, nelements * sizeof(float));
×
1358
    compressedSize = 1 + nelements * sizeof(float);
×
1359
  } else {
1360
    // compressed successfully
1361
    output[0] = MODE_COMPRESS | algo;
×
1362
    compressedSize += 1;
×
1363
  }
1364

1365
  return compressedSize;
×
1366
}
1367

1368
int32_t tsDecompressFloatLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
×
1369
                                  char *const output) {
1370
  int32_t decompressedSize = 0;
×
1371
  if (HEAD_MODE(input[0]) == MODE_NOCOMPRESS) {
×
1372
    // orginal so memcpy directly
1373
    decompressedSize = nelements * sizeof(float);
×
1374
    memcpy(output, input + 1, decompressedSize);
×
1375

1376
    return decompressedSize;
×
1377
  }
1378

1379
  // decompressed with sz
1380
  return tdszDecompress(SZ_FLOAT, input + 1, compressedSize - 1, nelements, output);
×
1381
}
1382

1383
int32_t tsCompressDoubleLossyImp(const char *input, const int32_t nelements, char *const output) {
434,704✔
1384
  // compress with sz
1385
  int32_t       compressedSize = tdszCompress(SZ_DOUBLE, input, nelements, output + 1);
434,704✔
1386
  unsigned char algo = ALGO_SZ_LOSSY << 1;
434,704✔
1387
  if (compressedSize == 0 || compressedSize >= nelements * sizeof(double)) {
434,704✔
1388
    // compressed error or large than original
1389
    output[0] = MODE_NOCOMPRESS | algo;
×
1390
    memcpy(output + 1, input, nelements * sizeof(double));
×
1391
    compressedSize = 1 + nelements * sizeof(double);
×
1392
  } else {
1393
    // compressed successfully
1394
    output[0] = MODE_COMPRESS | algo;
434,704✔
1395
    compressedSize += 1;
434,704✔
1396
  }
1397

1398
  return compressedSize;
434,704✔
1399
}
1400

1401
int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
665,892✔
1402
                                   char *const output) {
1403
  int32_t decompressedSize = 0;
665,892✔
1404
  if (HEAD_MODE(input[0]) == MODE_NOCOMPRESS) {
665,892✔
1405
    // orginal so memcpy directly
1406
    decompressedSize = nelements * sizeof(double);
×
1407
    memcpy(output, input + 1, decompressedSize);
×
1408

1409
    return decompressedSize;
×
1410
  }
1411

1412
  // decompressed with sz
1413
  return tdszDecompress(SZ_DOUBLE, input + 1, compressedSize - 1, nelements, output);
665,892✔
1414
}
1415

1416
/*************************************************************************
1417
 *                  REGULAR COMPRESSION
1418
 *************************************************************************/
1419
// Timestamp =====================================================
1420
int32_t tsCompressTimestamp(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
54,908✔
1421
                            int32_t nBuf) {
1422
  if (cmprAlg == ONE_STAGE_COMP) {
54,908✔
1423
    return tsCompressTimestampImp(pIn, nEle, pOut);
×
1424
  } else if (cmprAlg == TWO_STAGE_COMP) {
54,908✔
1425
    int32_t len = tsCompressTimestampImp(pIn, nEle, pBuf);
54,908✔
1426
    return tsCompressStringImp(pBuf, len, pOut, nOut);
54,908✔
1427
  } else {
1428
    // tDataTypeCompress[TSDB_DATA_TYPE_TIMESTAMP].compFunc(pIn, nIn, nEle, pOut, nOut, );
1429
  }
1430
  return 0;
×
1431
}
1432

1433
int32_t tsDecompressTimestamp(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
1,652✔
1434
                              void *pBuf, int32_t nBuf) {
1435
  if (cmprAlg == ONE_STAGE_COMP) {
1,652✔
1436
    return tsDecompressTimestampImp(pIn, nEle, pOut);
×
1437
  } else if (cmprAlg == TWO_STAGE_COMP) {
1,652✔
1438
    if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
1,652✔
1439
    return tsDecompressTimestampImp(pBuf, nEle, pOut);
1,652✔
1440
  } else {
1441
    return -1;
×
1442
  }
1443
}
1444

1445
// Float =====================================================
1446
int32_t tsCompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
×
1447
                        int32_t nBuf) {
1448
  // lossy mode
1449
  if (lossyFloat) {
×
1450
    return tsCompressFloatLossyImp(pIn, nEle, pOut);
×
1451
    // lossless mode
1452
  } else {
1453
    if (cmprAlg == ONE_STAGE_COMP) {
×
1454
      return tsCompressFloatImp(pIn, nEle, pOut);
×
1455
    } else if (cmprAlg == TWO_STAGE_COMP) {
×
1456
      int32_t len = tsCompressFloatImp(pIn, nEle, pBuf);
×
1457
      return tsCompressStringImp(pBuf, len, pOut, nOut);
×
1458
    } else {
1459
      return TSDB_CODE_INVALID_PARA;
×
1460
    }
1461
  }
1462
}
1463

1464
int32_t tsDecompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
×
1465
                          int32_t nBuf) {
1466
  if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
×
1467
    // decompress lossy
1468
    return tsDecompressFloatLossyImp(pIn, nIn, nEle, pOut);
×
1469
  } else {
1470
    // decompress lossless
1471
    if (cmprAlg == ONE_STAGE_COMP) {
×
1472
      return tsDecompressFloatImp(pIn, nIn, nEle, pOut);
×
1473
    } else if (cmprAlg == TWO_STAGE_COMP) {
×
1474
      int32_t bufLen = tsDecompressStringImp(pIn, nIn, pBuf, nBuf);
×
1475
      if (bufLen < 0) return -1;
×
1476
      return tsDecompressFloatImp(pBuf, bufLen, nEle, pOut);
×
1477
    } else {
1478
      return TSDB_CODE_INVALID_PARA;
×
1479
    }
1480
  }
1481
}
1482

1483
// Double =====================================================
1484
int32_t tsCompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
×
1485
                         int32_t nBuf) {
1486
  if (lossyDouble) {
×
1487
    // lossy mode
1488
    return tsCompressDoubleLossyImp(pIn, nEle, pOut);
×
1489
  } else {
1490
    // lossless mode
1491
    if (cmprAlg == ONE_STAGE_COMP) {
×
1492
      return tsCompressDoubleImp(pIn, nEle, pOut);
×
1493
    } else if (cmprAlg == TWO_STAGE_COMP) {
×
1494
      int32_t len = tsCompressDoubleImp(pIn, nEle, pBuf);
×
1495
      return tsCompressStringImp(pBuf, len, pOut, nOut);
×
1496
    } else {
1497
      return TSDB_CODE_INVALID_PARA;
×
1498
    }
1499
  }
1500
}
1501

1502
int32_t tsDecompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
×
1503
                           int32_t nBuf) {
1504
  if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
×
1505
    // decompress lossy
1506
    return tsDecompressDoubleLossyImp(pIn, nIn, nEle, pOut);
×
1507
  } else {
1508
    // decompress lossless
1509
    if (cmprAlg == ONE_STAGE_COMP) {
×
1510
      return tsDecompressDoubleImp(pIn, nIn, nEle, pOut);
×
1511
    } else if (cmprAlg == TWO_STAGE_COMP) {
×
1512
      int32_t bufLen = tsDecompressStringImp(pIn, nIn, pBuf, nBuf);
×
1513
      if (bufLen < 0) return -1;
×
1514
      return tsDecompressDoubleImp(pBuf, bufLen, nEle, pOut);
×
1515
    } else {
1516
      return TSDB_CODE_INVALID_PARA;
×
1517
    }
1518
  }
1519
}
1520

1521
// Binary =====================================================
1522
int32_t tsCompressString(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
298,230✔
1523
                         int32_t nBuf) {
1524
  return tsCompressStringImp(pIn, nIn, pOut, nOut);
298,230✔
1525
}
1526

1527
int32_t tsDecompressString(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
29,146,410✔
1528
                           int32_t nBuf) {
1529
  return tsDecompressStringImp(pIn, nIn, pOut, nOut);
29,146,410✔
1530
}
1531

1532
// Bool =====================================================
1533
int32_t tsCompressBool(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
×
1534
                       int32_t nBuf) {
1535
  if (cmprAlg == ONE_STAGE_COMP) {
×
1536
    return tsCompressBoolImp(pIn, nEle, pOut);
×
1537
  } else if (cmprAlg == TWO_STAGE_COMP) {
×
1538
    int32_t len = tsCompressBoolImp(pIn, nEle, pBuf);
×
1539
    if (len < 0) {
×
1540
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1541
    }
1542
    return tsCompressStringImp(pBuf, len, pOut, nOut);
×
1543
  } else {
1544
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1545
  }
1546
}
1547

1548
int32_t tsDecompressBool(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
×
1549
                         int32_t nBuf) {
1550
  int32_t code = 0;
×
1551
  if (cmprAlg == ONE_STAGE_COMP) {
×
1552
    return tsDecompressBoolImp(pIn, nEle, pOut);
×
1553
  } else if (cmprAlg == TWO_STAGE_COMP) {
×
1554
    if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code;
×
1555
    return tsDecompressBoolImp(pBuf, nEle, pOut);
×
1556
  } else {
1557
    return TSDB_CODE_INVALID_PARA;
×
1558
  }
1559
}
1560

1561
// Tinyint =====================================================
1562
int32_t tsCompressTinyint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
29,150✔
1563
                          int32_t nBuf) {
1564
  if (cmprAlg == ONE_STAGE_COMP) {
29,150✔
1565
    return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT);
×
1566
  } else if (cmprAlg == TWO_STAGE_COMP) {
29,150✔
1567
    int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_TINYINT);
29,150✔
1568
    if (len < 0) {
29,150✔
1569
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1570
    }
1571
    return tsCompressStringImp(pBuf, len, pOut, nOut);
29,150✔
1572
  } else {
1573
    return TSDB_CODE_INVALID_PARA;
×
1574
  }
1575
}
1576

1577
int32_t tsDecompressTinyint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
1,166✔
1578
                            int32_t nBuf) {
1579
  int32_t code = 0;
1,166✔
1580
  if (cmprAlg == ONE_STAGE_COMP) {
1,166✔
1581
    return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT);
×
1582
  } else if (cmprAlg == TWO_STAGE_COMP) {
1,166✔
1583
    if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code;
1,166✔
1584
    return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_TINYINT);
1,166✔
1585
  } else {
1586
    return TSDB_CODE_INVALID_PARA;
×
1587
  }
1588
}
1589

1590
// Smallint =====================================================
1591
int32_t tsCompressSmallint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
29,150✔
1592
                           int32_t nBuf) {
1593
  if (cmprAlg == ONE_STAGE_COMP) {
29,150✔
1594
    return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT);
×
1595
  } else if (cmprAlg == TWO_STAGE_COMP) {
29,150✔
1596
    int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_SMALLINT);
29,150✔
1597
    if (len < 0) {
29,150✔
1598
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1599
    }
1600
    return tsCompressStringImp(pBuf, len, pOut, nOut);
29,150✔
1601
  } else {
1602
    return TSDB_CODE_INVALID_PARA;
×
1603
  }
1604
}
1605

1606
int32_t tsDecompressSmallint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
1,166✔
1607
                             void *pBuf, int32_t nBuf) {
1608
  int32_t code = 0;
1,166✔
1609
  if (cmprAlg == ONE_STAGE_COMP) {
1,166✔
1610
    return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT);
×
1611
  } else if (cmprAlg == TWO_STAGE_COMP) {
1,166✔
1612
    if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code;
1,166✔
1613
    return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_SMALLINT);
1,166✔
1614
  } else {
1615
    return TSDB_CODE_INVALID_PARA;
×
1616
  }
1617
}
1618

1619
// Int =====================================================
1620
int32_t tsCompressInt(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
4,042,111✔
1621
                      int32_t nBuf) {
1622
  if (cmprAlg == ONE_STAGE_COMP) {
4,042,111✔
1623
    return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT);
47,880✔
1624
  } else if (cmprAlg == TWO_STAGE_COMP) {
3,994,231✔
1625
    int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_INT);
3,994,231✔
1626
    if (len < 0) {
3,994,673✔
1627
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1628
    }
1629
    return tsCompressStringImp(pBuf, len, pOut, nOut);
3,994,673✔
1630
  } else {
1631
    return TSDB_CODE_INVALID_PARA;
×
1632
  }
1633
}
1634

1635
int32_t tsDecompressInt(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
141,260,033✔
1636
                        int32_t nBuf) {
1637
  int32_t code = 0;
141,260,033✔
1638
  if (cmprAlg == ONE_STAGE_COMP) {
141,260,033✔
1639
    return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT);
297,920✔
1640
  } else if (cmprAlg == TWO_STAGE_COMP) {
140,962,113✔
1641
    if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code;
140,965,900✔
1642
    return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_INT);
140,965,284✔
1643
  } else {
1644
    return TSDB_CODE_INVALID_PARA;
×
1645
  }
1646
}
1647

1648
// Bigint =====================================================
1649
int32_t tsCompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
101,249,553✔
1650
                         int32_t nBuf) {
1651
  if (cmprAlg == ONE_STAGE_COMP) {
101,249,553✔
1652
    return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
103,740✔
1653
  } else if (cmprAlg == TWO_STAGE_COMP) {
101,145,813✔
1654
    int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_BIGINT);
101,155,403✔
1655
    if (len < 0) {
101,153,903✔
1656
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1657
    }
1658
    return tsCompressStringImp(pBuf, len, pOut, nOut);
101,153,903✔
1659
  } else {
1660
    return TSDB_CODE_INVALID_PARA;
×
1661
  }
1662
}
1663

1664
int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
1,275,454,451✔
1665
                           int32_t nBuf) {
1666
  int32_t code = 0;
1,275,454,451✔
1667
  if (cmprAlg == ONE_STAGE_COMP) {
1,275,454,451✔
1668
    return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
606,480✔
1669
  } else if (cmprAlg == TWO_STAGE_COMP) {
1,274,847,971✔
1670
    if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code;
1,274,893,221✔
1671
    return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
1,274,769,899✔
1672
  } else {
1673
    return TSDB_CODE_INVALID_PARA;
×
1674
  }
1675
}
1676

1677
#define FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, alg, pBuf, nBuf, type, compress)                               \
1678
  do {                                                                                                                \
1679
    DEFINE_VAR(alg)                                                                                                   \
1680
    if (l1 != L1_DISABLED && l2 == L2_DISABLED) {                                                                     \
1681
      if (compress) {                                                                                                 \
1682
        uTrace("encode:%s, compress:%s, level:%s, type:%s", compressL1Dict[l1].name, "disabled", "disabled",          \
1683
               tDataTypes[type].name);                                                                                \
1684
        return compressL1Dict[l1].comprFn(pIn, nEle, pOut, type);                                                     \
1685
      } else {                                                                                                        \
1686
        uTrace("dencode:%s, compress:%s, level:%s, type:%s", compressL1Dict[l1].name, "disabled", "disabled",         \
1687
               tDataTypes[type].name);                                                                                \
1688
        return compressL1Dict[l1].decomprFn(pIn, nIn, nEle, pOut, type);                                              \
1689
      }                                                                                                               \
1690
    } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {                                                              \
1691
      if (compress) {                                                                                                 \
1692
        uTrace("encode:%s, compress:%s, level:%d, type:%s, l1:%d", compressL1Dict[l1].name, compressL2Dict[l2].name,  \
1693
               lvl, tDataTypes[type].name, l1);                                                                       \
1694
        int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type);                                              \
1695
        if (len < 0) {                                                                                                \
1696
          return len;                                                                                                 \
1697
        }                                                                                                             \
1698
        int8_t alvl = tsGetCompressL2Level(l2, lvl);                                                                  \
1699
        return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, alvl);                                         \
1700
      } else {                                                                                                        \
1701
        uTrace("dencode:%s, decompress:%s, level:%d, type:%s", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl, \
1702
               tDataTypes[type].name);                                                                                \
1703
        int32_t bufLen = compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type);                                    \
1704
        if (bufLen < 0) return -1;                                                                                    \
1705
        return compressL1Dict[l1].decomprFn(pBuf, bufLen, nEle, pOut, type);                                          \
1706
      }                                                                                                               \
1707
    } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {                                                              \
1708
      if (compress) {                                                                                                 \
1709
        uTrace("encode:%s, compress:%s, level:%d, type:%s", "disabled", "disable", lvl, tDataTypes[type].name);       \
1710
        int8_t alvl = tsGetCompressL2Level(l2, lvl);                                                                  \
1711
        return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, alvl);                                          \
1712
      } else {                                                                                                        \
1713
        uTrace("dencode:%s, decompress:%s, level:%d, type:%s", "disabled", compressL2Dict[l2].name, lvl,              \
1714
               tDataTypes[type].name);                                                                                \
1715
        return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, type);                                              \
1716
      }                                                                                                               \
1717
    }                                                                                                                 \
1718
    return TSDB_CODE_INVALID_PARA;                                                                                    \
1719
  } while (1)
1720

1721
/*************************************************************************
1722
 *                  REGULAR COMPRESSION 2
1723
 *************************************************************************/
1724
// Timestamp =====================================================
1725
int32_t tsCompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
78,099,919✔
1726
                             void *pBuf, int32_t nBuf) {
1727
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TIMESTAMP, 1);
78,099,919✔
1728
}
1729

1730
int32_t tsDecompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
239,817,421✔
1731
                               void *pBuf, int32_t nBuf) {
1732
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TIMESTAMP, 0);
239,817,421✔
1733
}
1734

1735
// Float =====================================================
1736
int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
59,783,109✔
1737
                         int32_t nBuf) {
1738
  DEFINE_VAR(cmprAlg)
59,783,109✔
1739
  if (l2 == L2_TSZ && lvl != 0 && lossyFloat) {
59,783,109✔
1740
    return tsCompressFloatLossyImp(pIn, nEle, pOut);
×
1741
  }
1742
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1);
59,785,108✔
1743
}
1744

1745
int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
68,589,363✔
1746
                           int32_t nBuf) {
1747
  DEFINE_VAR(cmprAlg)
68,589,363✔
1748
  if (lvl != 0 && HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY && l1 != L1_BSS) {
68,589,363✔
1749
    return tsDecompressFloatLossyImp(pIn, nIn, nEle, pOut);
×
1750
  }
1751
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 0);
68,589,663✔
1752
}
1753

1754
// Double =====================================================
1755
int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
49,941,369✔
1756
                          int32_t nBuf) {
1757
  DEFINE_VAR(cmprAlg)
49,941,369✔
1758
  if (l2 == L2_TSZ && lvl != 0 && lossyDouble) {
49,941,369✔
1759
    // lossy mode
1760
    return tsCompressDoubleLossyImp(pIn, nEle, pOut);
414,185✔
1761
  }
1762
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DOUBLE, 1);
49,527,184✔
1763
}
1764

1765
int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
39,334,096✔
1766
                            void *pBuf, int32_t nBuf) {
1767
  DEFINE_VAR(cmprAlg)
39,334,096✔
1768
  if (lvl != 0 && HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY && l1 != L1_BSS) {
39,334,096✔
1769
    // decompress lossy
1770
    return tsDecompressDoubleLossyImp(pIn, nIn, nEle, pOut);
665,892✔
1771
  }
1772
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DOUBLE, 0);
38,671,217✔
1773
}
1774

1775
// Binary =====================================================
1776
int32_t tsCompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
48,906,814✔
1777
                          int32_t nBuf) {
1778
  DEFINE_VAR(cmprAlg)
48,906,814✔
1779
  if (l2 == L2_DISABLED) {
48,906,814✔
1780
    l2 = 0;
×
1781
  }
1782
  return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, TSDB_DATA_TYPE_BINARY, lvl);
48,906,814✔
1783
}
1784

1785
int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
112,311,677✔
1786
                            void *pBuf, int32_t nBuf) {
1787
  // return 0;
1788
  DEFINE_VAR(cmprAlg)
112,311,677✔
1789
  if (l2 == L2_DISABLED) {
112,311,677✔
1790
    l2 = 0;
×
1791
  }
1792
  return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, TSDB_DATA_TYPE_BINARY);
112,311,677✔
1793
}
1794

1795
// Bool =====================================================
1796
int32_t tsCompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
19,427,754✔
1797
                        int32_t nBuf) {
1798
  uint32_t tCmprAlg = 0;
19,427,754✔
1799
  DEFINE_VAR(cmprAlg)
19,427,754✔
1800
  if (l1 != L1_RLE) {
19,427,754✔
1801
    SET_COMPRESS(L1_RLE, l2, lvl, tCmprAlg);
1,647✔
1802
  } else {
1803
    tCmprAlg = cmprAlg;
19,426,107✔
1804
  }
1805
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BOOL, 1);
19,427,754✔
1806
}
1807

1808
int32_t tsDecompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
19,292,769✔
1809
                          int32_t nBuf) {
1810
  uint32_t tCmprAlg = 0;
19,292,769✔
1811
  DEFINE_VAR(cmprAlg)
19,292,769✔
1812
  if (l1 != L1_RLE) {
19,292,769✔
1813
    SET_COMPRESS(L1_RLE, l2, lvl, tCmprAlg);
2,196✔
1814
  } else {
1815
    tCmprAlg = cmprAlg;
19,290,573✔
1816
  }
1817
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BOOL, 0);
19,292,769✔
1818
}
1819

1820
// Tinyint =====================================================
1821
int32_t tsCompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
51,310,760✔
1822
                           int32_t nBuf) {
1823
  uint32_t tCmprAlg = 0;
51,310,760✔
1824
  DEFINE_VAR(cmprAlg)
51,310,760✔
1825
  if (l1 != L1_SIMPLE_8B) {
51,310,760✔
1826
    SET_COMPRESS(L1_SIMPLE_8B, l2, lvl, tCmprAlg);
12,474,445✔
1827
  } else {
1828
    tCmprAlg = cmprAlg;
38,836,315✔
1829
  }
1830
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TINYINT, 1);
51,310,760✔
1831
}
1832

1833
int32_t tsDecompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
140,271,882✔
1834
                             void *pBuf, int32_t nBuf) {
1835
  uint32_t tCmprAlg = 0;
140,271,882✔
1836
  DEFINE_VAR(cmprAlg)
140,271,882✔
1837
  if (l1 != L1_SIMPLE_8B) {
140,271,882✔
1838
    SET_COMPRESS(L1_SIMPLE_8B, l2, lvl, tCmprAlg);
44,711,524✔
1839
  } else {
1840
    tCmprAlg = cmprAlg;
95,560,358✔
1841
  }
1842
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TINYINT, 0);
140,271,882✔
1843
}
1844

1845
// Smallint =====================================================
1846
int32_t tsCompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
25,118,079✔
1847
                            void *pBuf, int32_t nBuf) {
1848
  uint32_t tCmprAlg = 0;
25,118,079✔
1849
  DEFINE_VAR(cmprAlg)
25,118,079✔
1850
  if (l1 != L1_SIMPLE_8B) {
25,118,079✔
1851
    SET_COMPRESS(L1_SIMPLE_8B, l2, lvl, tCmprAlg);
1,647✔
1852
  } else {
1853
    tCmprAlg = cmprAlg;
25,116,432✔
1854
  }
1855
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_SMALLINT, 1);
25,118,079✔
1856
}
1857

1858
int32_t tsDecompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
47,273,697✔
1859
                              void *pBuf, int32_t nBuf) {
1860
  uint32_t tCmprAlg = 0;
47,273,697✔
1861
  DEFINE_VAR(cmprAlg)
47,273,697✔
1862
  if (l1 != L1_SIMPLE_8B) {
47,273,697✔
1863
    SET_COMPRESS(L1_SIMPLE_8B, l2, lvl, tCmprAlg);
2,196✔
1864
  } else {
1865
    tCmprAlg = cmprAlg;
47,271,501✔
1866
  }
1867
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_SMALLINT, 0);
47,273,697✔
1868
}
1869

1870
// Int =====================================================
1871
int32_t tsCompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
118,143,365✔
1872
                       int32_t nBuf) {
1873
  uint32_t tCmprAlg = 0;
118,143,365✔
1874
  DEFINE_VAR(cmprAlg)
118,143,365✔
1875
  if (l1 != L1_SIMPLE_8B) {
118,143,365✔
1876
    SET_COMPRESS(L1_SIMPLE_8B, l2, lvl, tCmprAlg);
48,922,343✔
1877
  } else {
1878
    tCmprAlg = cmprAlg;
69,221,022✔
1879
  }
1880

1881
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_INT, 1);
118,143,365✔
1882
}
1883

1884
int32_t tsDecompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
202,158,950✔
1885
                         int32_t nBuf) {
1886
  uint32_t tCmprAlg = 0;
202,158,950✔
1887
  DEFINE_VAR(cmprAlg)
202,158,950✔
1888
  if (l1 != L1_SIMPLE_8B) {
202,158,950✔
1889
    SET_COMPRESS(L1_SIMPLE_8B, l2, lvl, tCmprAlg);
112,370,574✔
1890
  } else {
1891
    tCmprAlg = cmprAlg;
89,788,376✔
1892
  }
1893
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_INT, 0);
202,158,950✔
1894
}
1895

1896
// Bigint =====================================================
1897
int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
182,882,029✔
1898
                          int32_t nBuf) {
1899
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 1);
182,882,029✔
1900
}
1901

1902
int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
445,393,088✔
1903
                            void *pBuf, int32_t nBuf) {
1904
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 0);
445,393,088✔
1905
}
1906

1907
int32_t tsCompressDecimal64(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
179,891✔
1908
                            void *pBuf, int32_t nBuf) {
1909
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DECIMAL64, 1);
179,891✔
1910
}
1911
int32_t tsDecompressDecimal64(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
1,090,954✔
1912
                              void *pBuf, int32_t nBuf) {
1913
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DECIMAL64, 0);
1,090,954✔
1914
}
1915

1916
int32_t tsCompressDecimal128(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
218,703✔
1917
                             void *pBuf, int32_t nBuf) {
1918
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DECIMAL, 1);
218,703✔
1919
}
1920
int32_t tsDecompressDecimal128(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
3,032,319✔
1921
                               void *pBuf, int32_t nBuf) {
1922
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DECIMAL, 0);
3,032,319✔
1923
}
1924

1925
void tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level) {
671,825,936✔
1926
  DEFINE_VAR(cmprAlg)
671,825,936✔
1927
  *l1Alg = l1;
671,825,936✔
1928
  *l2Alg = l2;
671,890,185✔
1929
  *level = lvl;
671,889,498✔
1930
  return;
671,909,277✔
1931
}
1932

1933
int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint8_t l2Disabled, uint8_t lvlDiabled, uint8_t lvlDefault,
92,133✔
1934

1935
                       uint32_t *dst) {
1936
  int8_t  update = 0;
92,133✔
1937
  uint8_t ol1 = COMPRESS_L1_TYPE_U32(oldCmpr);
92,133✔
1938
  uint8_t ol2 = COMPRESS_L2_TYPE_U32(oldCmpr);
92,133✔
1939
  uint8_t olvl = COMPRESS_L2_TYPE_LEVEL_U32(oldCmpr);
92,133✔
1940

1941
  uint8_t nl1 = COMPRESS_L1_TYPE_U32(newCmpr);
92,133✔
1942
  uint8_t nl2 = COMPRESS_L2_TYPE_U32(newCmpr);
92,133✔
1943
  uint8_t nlvl = COMPRESS_L2_TYPE_LEVEL_U32(newCmpr);
92,133✔
1944

1945
  // nl1 == 0, not update encode
1946
  // nl2 == 0, not update compress
1947
  // nl3 == 0, not update level
1948
  if (nl1 != 0 && ol1 != nl1) {
92,133✔
1949
    SET_COMPRESS(nl1, ol2, olvl, *dst);
7,426✔
1950
    update = 1;
7,426✔
1951
    ol1 = nl1;
7,426✔
1952
  }
1953

1954
  if (nl2 != 0 && ol2 != nl2) {
92,133✔
1955
    if (nl2 == l2Disabled) {
21,629✔
1956
      SET_COMPRESS(ol1, nl2, lvlDiabled, *dst);
2,136✔
1957
    } else {
1958
      if (ol2 == l2Disabled) {
19,493✔
1959
        SET_COMPRESS(ol1, nl2, lvlDefault, *dst);
2,136✔
1960
      } else {
1961
        SET_COMPRESS(ol1, nl2, olvl, *dst);
17,357✔
1962
      }
1963
    }
1964
    update = 1;
21,629✔
1965
    ol2 = nl2;
21,629✔
1966
  }
1967

1968
  if (nlvl != 0 && olvl != nlvl) {
92,133✔
1969
    if (update == 0) {
62,990✔
1970
      if (ol2 == L2_DISABLED) {
57,694✔
1971
        update = -1;
×
1972
        return update;
×
1973
      }
1974
    }
1975
    SET_COMPRESS(ol1, ol2, nlvl, *dst);
62,990✔
1976
    update = 1;
62,990✔
1977
  }
1978

1979
  return update;
92,133✔
1980
}
1981

1982
int32_t getWordLength(char type) {
2,147,483,647✔
1983
  int32_t wordLength = 0;
2,147,483,647✔
1984
  switch (type) {
2,147,483,647✔
1985
    case TSDB_DATA_TYPE_BIGINT:
1,520,558,697✔
1986
      wordLength = LONG_BYTES;
1,520,558,697✔
1987
      break;
1,520,558,697✔
1988
    case TSDB_DATA_TYPE_INT:
465,599,721✔
1989
      wordLength = INT_BYTES;
465,599,721✔
1990
      break;
465,599,721✔
1991
    case TSDB_DATA_TYPE_SMALLINT:
72,419,008✔
1992
      wordLength = SHORT_BYTES;
72,419,008✔
1993
      break;
72,419,008✔
1994
    case TSDB_DATA_TYPE_TINYINT:
191,625,929✔
1995
      wordLength = CHAR_BYTES;
191,625,929✔
1996
      break;
191,625,929✔
1997
    case TSDB_DATA_TYPE_DECIMAL64:
×
1998
      wordLength = DECIMAL64_BYTES;
×
1999
      break;
×
2000
    case TSDB_DATA_TYPE_DECIMAL:
×
2001
      wordLength = DECIMAL128_BYTES;
×
2002
      break;
×
2003
    default:
31✔
2004
      uError("Invalid decompress integer type:%d", type);
31✔
2005
      return TSDB_CODE_INVALID_PARA;
×
2006
  }
2007

2008
  return wordLength;
2,147,483,647✔
2009
}
2010

2011
int32_t plainCompressImpl(void *src, int32_t srcSize, void *dst, int32_t *dstSize) {
×
2012
  int32_t size = *dstSize;
×
2013
  if (size < srcSize) {
×
2014
    return -1;
×
2015
  }
2016
  memcpy(dst, src, srcSize);
×
2017
  return srcSize;
×
2018
}
2019

2020
int32_t plainDecompressImpl(void *src, int32_t srcSize, void *dst, int32_t *dstSize) {
×
2021
  int32_t size = *dstSize;
×
2022
  if (size < srcSize) {
×
2023
    return -1;
×
2024
  }
2025
  memcpy(dst, src, srcSize);
×
2026
  return 0;
×
2027
}
2028

2029
int32_t lz4CompressImpl(void *src, int32_t srcSize, void *dst, int32_t *dstSize) {
3,528✔
2030
  int32_t size = *dstSize;
3,528✔
2031
  int32_t nWrite = LZ4_compress_default(src, dst, srcSize, size);
3,528✔
2032
  if (nWrite <= 0) {
3,528✔
2033
    return -1;
×
2034
  }
2035
  if (nWrite >= srcSize) {
3,528✔
2036
    return -1;
×
2037
  }
2038

2039
  *dstSize = nWrite;
3,528✔
2040
  return 0;
3,528✔
2041
}
2042
int32_t lz4DecompressImpl(void *src, int32_t srcSize, void *dst, int32_t *dstSize) {
1,668✔
2043
  int32_t size = *dstSize;
1,668✔
2044
  int32_t nread = LZ4_decompress_safe(src, dst, srcSize, size);
1,668✔
2045
  if (nread <= 0) {
1,668✔
2046
    return -1;
×
2047
  }
2048
  *dstSize = nread;
1,668✔
2049
  return 0;
1,668✔
2050
}
2051

2052
int32_t zlibCompressImpl(void *src, int32_t srcSize, void *dst, int32_t *dstSize) {
×
2053
#if defined(WINDOWS) || defined(DARWIN)
2054
  return TSDB_CODE_INVALID_CFG;
2055
#else
2056
  uLongf  dstLen = *dstSize;
×
2057
  int32_t ret = compress2(dst, &dstLen, src, srcSize, Z_BEST_COMPRESSION);
×
2058
  if (ret != Z_OK) {
×
2059
    return -1;
×
2060
  }
2061
  if (dstLen > srcSize) {
×
2062
    return -1;
×
2063
  }
2064
  *dstSize = dstLen;
×
2065
  return 0;
×
2066
#endif
2067
}
2068
int32_t zlibDecompressImpl(void *src, int32_t srcSize, void *dst, int32_t *dstSize) {
×
2069
#if defined(WINDOWS) || defined(DARWIN)
2070
  return TSDB_CODE_INVALID_CFG;
2071
#else
2072
  int32_t code = 0;
×
2073

2074
  uLongf  dstLen = *dstSize;
×
2075
  int32_t ret = uncompress(dst, &dstLen, src, srcSize);
×
2076
  if (ret != Z_OK) {
×
2077
    return -1;
×
2078
  }
2079

2080
  *dstSize = dstLen;
×
2081
  return code;
×
2082
#endif
2083
}
2084

2085
int32_t zstdCompressImpl(void *src, int32_t srcSize, void *dst, int32_t *dstSize) {
×
2086
#if defined(WINDOWS) || defined(DARWIN)
2087
  return TSDB_CODE_INVALID_CFG;
2088
#else
2089
  size_t len = ZSTD_compress(dst, *dstSize, src, srcSize, 9);
×
2090
  if (ZSTD_isError(len)) {
×
2091
    return -1;
×
2092
  }
2093

2094
  if (len > srcSize) {
×
2095
    return -1;
×
2096
  }
2097

2098
  *dstSize = len;
×
2099
  return 0;
×
2100
#endif
2101
}
2102
int32_t zstdDecompressImpl(void *src, int32_t srcSize, void *dst, int32_t *dstSize) {
×
2103
#if defined(WINDOWS) || defined(DARWIN)
2104
  return TSDB_CODE_INVALID_CFG;
2105
#else
2106
  size_t len = ZSTD_decompress(dst, *dstSize, src, srcSize);
×
2107
  if (ZSTD_isError(len)) {
×
2108
    return -1;
×
2109
  }
2110

2111
  *dstSize = len;
×
2112
  return 0;
×
2113
#endif
2114
}
2115

2116
int32_t xzCompressImpl(void *src, int32_t srcSize, void *dst, int32_t *dstSize) {
×
2117
#if defined(WINDOWS) || defined(DARWIN)
2118
  return TSDB_CODE_INVALID_CFG;
2119
#else
2120
  size_t len = FL2_compress(dst, *dstSize, src, srcSize, 9);
×
2121
  if (len == 0 || len > srcSize) {
×
2122
    return -1;
×
2123
  }
2124
  *dstSize = len;
×
2125
  return 0;
×
2126
#endif
2127
}
2128
int32_t xzDecompressImpl(void *src, int32_t srcSize, void *dst, int32_t *dstSize) {
×
2129
#if defined(WINDOWS) || defined(DARWIN)
2130
  return TSDB_CODE_INVALID_CFG;
2131
#else
2132
  size_t len = FL2_decompress(dst, *dstSize, src, srcSize);
×
2133
  if (len == 0) {
×
2134
    return -1;
×
2135
  }
2136
  *dstSize = len;
×
2137
  return 0;
×
2138
#endif
2139
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc