• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.75
/source/util/src/tcompression.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
/* README.md   TAOS compression
17
 *
18
 * INTEGER Compression Algorithm:
19
 *   To compress integers (including char, short, int32_t, int64_t), the difference
20
 *   between two integers is calculated at first. Then the difference is
21
 *   transformed to positive by zig-zag encoding method
22
 *   (https://gist.github.com/mfuerstenau/ba870a29e16536fdbaba). Then the value is
23
 *   encoded using simple 8B method. For more information about simple 8B,
24
 *   refer to https://en.wikipedia.org/wiki/8b/10b_encoding.
25
 *
26
 *   NOTE : For bigint, only 59 bits can be used, which means data from -(2**59) to (2**59)-1
27
 *   are allowed.
28
 *
29
 * BOOLEAN Compression Algorithm:
30
 *   We provide two methods for compress boolean types. Because boolean types in C
31
 *   code are char bytes with 0 and 1 values only, only one bit can used to discriminate
32
 *   the values.
33
 *   1. The first method is using only 1 bit to represent the boolean value with 1 for
34
 *   true and 0 for false. Then the compression rate is 1/8.
35
 *   2. The second method is using run length encoding (RLE) methods. This method works
36
 *   better when there are a lot of consecutive true values or false values.
37
 *
38
 * STRING Compression Algorithm:
39
 *   We us LZ4 method to compress the string type.
40
 *
41
 * FLOAT Compression Algorithm:
42
 *   We use the same method with Akumuli to compress float and double types. The compression
43
 *   algorithm assumes the float/double values change slightly. So we take the XOR between two
44
 *   adjacent values. Then compare the number of leading zeros and trailing zeros. If the number
45
 *   of leading zeros are larger than the trailing zeros, then record the last serveral bytes
46
 *   of the XORed value with informations. If not, record the first corresponding bytes.
47
 *
48
 */
49

50
#define _DEFAULT_SOURCE
51
#include "tcompression.h"
52
#include "lz4.h"
53
#include "tlog.h"
54
#include "ttypes.h"
55
// #include "tmsg.h"
56

57
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
58
#else
59
#include "fast-lzma2.h"
60
#include "zlib.h"
61
#include "zstd.h"
62
#endif
63

64
#include "td_sz.h"
65

66
int32_t tsCompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type);
67
int32_t tsDecompressPlain2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
68
                           const char type);
69
// delta
70
int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type);
71

72
int32_t tsDecompressTimestampImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
73
                                  const char type);
74
// simple8b
75
int32_t tsCompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type);
76
int32_t tsDecompressINTImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
77
                            const char type);
78

79
// bit
80
int32_t tsCompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type);
81
int32_t tsDecompressBoolImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
82
                             char const type);
83

84
// double specail
85

86
int32_t tsCompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type);
87
int32_t tsDecompressDoubleImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
88
                               char const type);
89

90
int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, char *const output);
91
int32_t tsDecompressDoubleImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output);
92
int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, char *const output);
93
int32_t tsDecompressFloatImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output);
94

95
int32_t l2ComressInitImpl_disabled(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
×
96
                                   uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
97
  return 0;
×
98
}
99

100
int32_t l2CompressImpl_disabled(const char *const input, const int32_t inputSize, char *const output,
×
101
                                int32_t outputSize, const char type, int8_t lvl) {
102
  output[0] = 0;
×
103
  memcpy(output + 1, input, inputSize);
×
104
  return inputSize + 1;
×
105
}
106
int32_t l2DecompressImpl_disabled(const char *const input, const int32_t compressedSize, char *const output,
×
107
                                  int32_t outputSize, const char type) {
108
  memcpy(output, input + 1, compressedSize - 1);
×
109
  return compressedSize - 1;
×
110
}
111
int32_t l2ComressInitImpl_lz4(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
×
112
                              uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
113
  return 0;
×
114
}
115

116
int32_t l2CompressImpl_lz4(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
1,629,218✔
117
                           const char type, int8_t lvl) {
118
  const int32_t compressed_data_size = LZ4_compress_default(input, output + 1, inputSize, outputSize - 1);
1,629,218✔
119

120
  // If cannot compress or after compression, data becomes larger.
121
  if (compressed_data_size <= 0 || compressed_data_size > inputSize) {
1,629,259✔
122
    /* First byte is for indicator */
123
    output[0] = 0;
1,022,276✔
124
    memcpy(output + 1, input, inputSize);
1,022,276✔
125
    return inputSize + 1;
1,022,276✔
126
  }
127
  output[0] = 1;
606,983✔
128
  return compressed_data_size + 1;
606,983✔
129
}
130
int32_t l2DecompressImpl_lz4(const char *const input, const int32_t compressedSize, char *const output,
8,163,283✔
131
                             int32_t outputSize, const char type) {
132
  if (input[0] == 1) {
8,163,283✔
133
    /* It is compressed by LZ4 algorithm */
134
    const int32_t decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize);
2,385,848✔
135
    if (decompressed_size < 0) {
2,386,520✔
136
      uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size);
77!
137
      return TSDB_CODE_THIRDPARTY_ERROR;
×
138
    }
139

140
    return decompressed_size;
2,386,443✔
141
  } else if (input[0] == 0) {
5,777,435!
142
    /* It is not compressed by LZ4 algorithm */
143
    memcpy(output, input + 1, compressedSize - 1);
5,778,385✔
144
    return compressedSize - 1;
5,778,385✔
145
  } else if (input[1] == 2) {
×
146
    uError("Invalid decompress string indicator:%d", input[0]);
×
147
    return TSDB_CODE_THIRDPARTY_ERROR;
×
148
  }
149
  return TSDB_CODE_THIRDPARTY_ERROR;
×
150
}
151
int32_t l2ComressInitImpl_tsz(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
×
152
                              uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
153
  return 0;
×
154
}
155
int32_t l2CompressImpl_tsz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
×
156
                           const char type, int8_t lvl) {
157
  if (type == TSDB_DATA_TYPE_FLOAT) {
×
158
    if (lossyFloat) {
×
159
      return tsCompressFloatLossyImp(input, inputSize, output);
×
160
    }
161
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
×
162
    if (lossyDouble) {
×
163
      return tsCompressDoubleLossyImp(input, inputSize, output);
×
164
    }
165
  }
166

167
  return l2CompressImpl_lz4(input, inputSize, output, outputSize, type, lvl);
×
168
}
169

170
int32_t l2DecompressImpl_tsz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
×
171
                             const char type) {
172
  if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
×
173
    if (HEAD_ALGO(((uint8_t *)input)[0]) == ALGO_SZ_LOSSY) {
×
174
      return tsDecompressFloatLossyImp(input, inputSize, outputSize, output);
×
175
    }
176
  }
177

178
  return l2DecompressImpl_lz4(input, inputSize, output, outputSize, type);
×
179
}
180

181
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
182
// do nothing
183
#else
184

185
int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
×
186
                               uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
187
  return 0;
×
188
}
189

190
int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
422,278✔
191
                            const char type, int8_t lvl) {
192
  uLongf  dstLen = outputSize - 1;
422,278✔
193
  int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, lvl);
422,278✔
194
  if (ret == Z_OK) {
422,299!
195
    output[0] = 1;
422,299✔
196
    return dstLen + 1;
422,299✔
197
  } else {
UNCOV
198
    output[0] = 0;
×
UNCOV
199
    memcpy(output + 1, input, inputSize);
×
UNCOV
200
    return inputSize + 1;
×
201
  }
202
  return TSDB_CODE_THIRDPARTY_ERROR;
203
}
204
int32_t l2DecompressImpl_zlib(const char *const input, const int32_t compressedSize, char *const output,
1,207,719✔
205
                              int32_t outputSize, const char type) {
206
  if (input[0] == 1) {
1,207,719!
207
    uLongf len = outputSize;
1,207,799✔
208
    int    ret = uncompress((Bytef *)output, &len, (Bytef *)input + 1, compressedSize - 1);
1,207,799✔
209
    if (ret == Z_OK) {
1,207,990!
210
      return len;
1,208,026✔
211
    } else {
212
      return TSDB_CODE_THIRDPARTY_ERROR;
×
213
    }
214

215
  } else if (input[0] == 0) {
×
216
    /* It is not compressed by LZ4 algorithm */
217
    memcpy(output, input + 1, compressedSize - 1);
×
218
    return compressedSize - 1;
×
219
  } else if (input[1] == 2) {
×
220
    uError("Invalid decompress string indicator:%d", input[0]);
×
221
    return TSDB_CODE_THIRDPARTY_ERROR;
×
222
  }
223
  return 0;
×
224
}
225
int32_t l2ComressInitImpl_zstd(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
×
226
                               uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
227
  return 0;
×
228
}
229

230
int32_t l2CompressImpl_zstd(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
956,070✔
231
                            const char type, int8_t lvl) {
232
  size_t len = ZSTD_compress(output + 1, outputSize - 1, input, inputSize, lvl);
956,070✔
233
  if (len > inputSize) {
956,346✔
234
    output[0] = 0;
463,982✔
235
    memcpy(output + 1, input, inputSize);
463,982✔
236
    return inputSize + 1;
463,982✔
237
  }
238
  output[0] = 1;
492,364✔
239

240
  return len + 1;
492,364✔
241
}
242
int32_t l2DecompressImpl_zstd(const char *const input, const int32_t compressedSize, char *const output,
2,574,706✔
243
                              int32_t outputSize, const char type) {
244
  if (input[0] == 1) {
2,574,706✔
245
    return ZSTD_decompress(output, outputSize, input + 1, compressedSize - 1);
848,114✔
246
  } else if (input[0] == 0) {
1,726,592!
247
    memcpy(output, input + 1, compressedSize - 1);
1,726,907✔
248
    return compressedSize - 1;
1,726,907✔
249
  }
250
  return TSDB_CODE_THIRDPARTY_ERROR;
×
251
}
252

253
int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
×
254
                             uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
255
  return 0;
×
256
}
257
int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
×
258
                          const char type, int8_t lvl) {
259
  size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl);
×
260
  if (len > inputSize) {
×
261
    output[0] = 0;
×
262
    memcpy(output + 1, input, inputSize);
×
263
    return inputSize + 1;
×
264
  }
265
  output[0] = 1;
×
266
  return len + 1;
×
267
}
268
int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSize, char *const output,
×
269
                            int32_t outputSize, const char type) {
270
  if (input[0] == 1) {
×
271
    return FL2_decompress(output, outputSize, input + 1, compressedSize - 1);
×
272
  } else if (input[0] == 0) {
×
273
    memcpy(output, input + 1, compressedSize - 1);
×
274
    return compressedSize - 1;
×
275
  }
276
  return TSDB_CODE_THIRDPARTY_ERROR;
×
277
}
278
#endif
279

280
TCmprL1FnSet compressL1Dict[] = {{"PLAIN", NULL, tsCompressPlain2, tsDecompressPlain2},
281
                                 {"SIMPLE-8B", NULL, tsCompressINTImp2, tsDecompressINTImp2},
282
                                 {"DELTAI", NULL, tsCompressTimestampImp2, tsDecompressTimestampImp2},
283
                                 {"BIT-PACKING", NULL, tsCompressBoolImp2, tsDecompressBoolImp2},
284
                                 {"DELTAD", NULL, tsCompressDoubleImp2, tsDecompressDoubleImp2}};
285

286
TCmprLvlSet compressL2LevelDict[] = {
287
    {"unknown", .lvl = {1, 2, 3}}, {"lz4", .lvl = {1, 2, 3}}, {"zlib", .lvl = {1, 6, 9}},
288
    {"zstd", .lvl = {1, 11, 22}},  {"tsz", .lvl = {1, 2, 3}}, {"xz", .lvl = {1, 6, 9}},
289
};
290

291
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
292
TCmprL2FnSet compressL2Dict[] = {
293
    {"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled},
294
    {"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
295
    {"zlib", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
296
    {"zstd", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
297
    {"tsz", l2ComressInitImpl_tsz, l2CompressImpl_tsz, l2DecompressImpl_tsz},
298
    {"xz", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4}};
299
#else
300
TCmprL2FnSet compressL2Dict[] = {
301
    {"unknown", l2ComressInitImpl_disabled, l2CompressImpl_disabled, l2DecompressImpl_disabled},
302
    {"lz4", l2ComressInitImpl_lz4, l2CompressImpl_lz4, l2DecompressImpl_lz4},
303
    {"zlib", l2ComressInitImpl_zlib, l2CompressImpl_zlib, l2DecompressImpl_zlib},
304
    {"zstd", l2ComressInitImpl_zstd, l2CompressImpl_zstd, l2DecompressImpl_zstd},
305
    {"tsz", l2ComressInitImpl_tsz, l2CompressImpl_tsz, l2DecompressImpl_tsz},
306
    {"xz", l2ComressInitImpl_xz, l2CompressImpl_xz, l2DecompressImpl_xz}};
307

308
#endif
309

310
int8_t tsGetCompressL2Level(uint8_t alg, uint8_t lvl) {
2,645,261✔
311
  if (lvl == L2_LVL_LOW) {
2,645,261!
312
    return compressL2LevelDict[alg].lvl[0];
×
313
  } else if (lvl == L2_LVL_MEDIUM) {
2,645,261!
314
    return compressL2LevelDict[alg].lvl[1];
2,645,427✔
315
  } else if (lvl == L2_LVL_HIGH) {
×
316
    return compressL2LevelDict[alg].lvl[2];
×
317
  }
318
  return 1;
×
319
}
320

321
static const int32_t TEST_NUMBER = 1;
322
#define is_bigendian()     ((*(char *)&TEST_NUMBER) == 0)
323
#define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL)
324

325
#define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a)))
326

327
bool lossyFloat = false;
328
bool lossyDouble = false;
329

330
// init call
331
void tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals,
2,372✔
332
                    int32_t ifAdtFse, const char *compressor) {
333
  // config
334
  lossyFloat = strstr(lossyColumns, "float") != NULL;
2,372✔
335
  lossyDouble = strstr(lossyColumns, "double") != NULL;
2,372✔
336

337
  tdszInit(fPrecision, dPrecision, maxIntervals, intervals, ifAdtFse, compressor);
2,372✔
338
  if (lossyFloat) uTrace("lossy compression float  is opened. ");
2,372!
339
  if (lossyDouble) uTrace("lossy compression double is opened. ");
2,372!
340
  return;
2,372✔
341
}
342
// exit call
343
void tsCompressExit() { tdszExit(); }
2,371✔
344

345
/*
346
 * Compress Integer (Simple8B).
347
 */
348
int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) {
2,371,705✔
349
  // Selector value:              0    1   2   3   4   5   6   7   8  9  10  11
350
  // 12  13  14  15
351
  char    bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
2,371,705✔
352
  int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
2,371,705✔
353
  char    bit_to_selector[] = {0,  2,  3,  4,  5,  6,  7,  8,  9,  10, 10, 11, 11, 12, 12, 12, 13, 13, 13, 13, 13,
2,371,705✔
354
                               14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,
355
                               15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15};
356

357
  // get the byte limit.
358
  int32_t word_length = getWordLength(type);
2,371,705✔
359

360
  int32_t byte_limit = nelements * word_length + 1;
2,371,498✔
361
  int32_t opos = 1;
2,371,498✔
362
  int64_t prev_value = 0;
2,371,498✔
363

364
  for (int32_t i = 0; i < nelements;) {
90,181,874✔
365
    char    selector = 0;
88,566,957✔
366
    char    bit = 0;
88,566,957✔
367
    int32_t elems = 0;
88,566,957✔
368
    int64_t prev_value_tmp = prev_value;
88,566,957✔
369

370
    for (int32_t j = i; j < nelements; j++) {
649,923,157✔
371
      // Read data from the input stream and convert it to INT64 type.
372
      int64_t curr_value = 0;
646,939,291✔
373
      switch (type) {
646,939,291!
374
        case TSDB_DATA_TYPE_TINYINT:
103,596,741✔
375
          curr_value = (int64_t)(*((int8_t *)input + j));
103,596,741✔
376
          break;
103,596,741✔
377
        case TSDB_DATA_TYPE_SMALLINT:
73,804,138✔
378
          curr_value = (int64_t)(*((int16_t *)input + j));
73,804,138✔
379
          break;
73,804,138✔
380
        case TSDB_DATA_TYPE_INT:
346,512,999✔
381
          curr_value = (int64_t)(*((int32_t *)input + j));
346,512,999✔
382
          break;
346,512,999✔
383
        case TSDB_DATA_TYPE_BIGINT:
127,307,289✔
384
          curr_value = (int64_t)(*((int64_t *)input + j));
127,307,289✔
385
          break;
127,307,289✔
386
      }
387
      // Get difference.
388
      if (!safeInt64Add(curr_value, -prev_value_tmp)) goto _copy_and_exit;
646,939,291!
389

390
      int64_t diff = curr_value - prev_value_tmp;
646,939,271✔
391
      // Zigzag encode the value.
392
      uint64_t zigzag_value = ZIGZAG_ENCODE(int64_t, diff);
646,939,271✔
393

394
      if (zigzag_value >= SIMPLE8B_MAX_INT64) goto _copy_and_exit;
646,939,271✔
395

396
      int64_t tmp_bit;
397
      if (zigzag_value == 0) {
646,632,625✔
398
        // Take care here, __builtin_clzl give wrong anser for value 0;
399
        tmp_bit = 0;
68,164,604✔
400
      } else {
401
        tmp_bit = (LONG_BYTES * BITS_PER_BYTE) - BUILDIN_CLZL(zigzag_value);
578,468,021✔
402
      }
403

404
      if (elems + 1 <= selector_to_elems[(int32_t)selector] &&
646,632,625✔
405
          elems + 1 <= selector_to_elems[(int32_t)(bit_to_selector[(int32_t)tmp_bit])]) {
568,807,595✔
406
        // If can hold another one.
407
        selector = selector > bit_to_selector[(int32_t)tmp_bit] ? selector : bit_to_selector[(int32_t)tmp_bit];
561,356,200✔
408
        elems++;
561,356,200✔
409
        bit = bit_per_integer[(int32_t)selector];
561,356,200✔
410
      } else {
411
        // if cannot hold another one.
412
        while (elems < selector_to_elems[(int32_t)selector]) selector++;
94,566,307✔
413
        elems = selector_to_elems[(int32_t)selector];
85,276,425✔
414
        bit = bit_per_integer[(int32_t)selector];
85,276,425✔
415
        break;
85,276,425✔
416
      }
417
      prev_value_tmp = curr_value;
561,356,200✔
418
    }
419

420
    uint64_t buffer = 0;
88,260,291✔
421
    buffer |= (uint64_t)selector;
88,260,291✔
422
    for (int32_t k = 0; k < elems; k++) {
651,909,748✔
423
      int64_t curr_value = 0; /* get current values */
563,649,457✔
424
      switch (type) {
563,649,457!
425
        case TSDB_DATA_TYPE_TINYINT:
94,318,501✔
426
          curr_value = (int64_t)(*((int8_t *)input + i));
94,318,501✔
427
          break;
94,318,501✔
428
        case TSDB_DATA_TYPE_SMALLINT:
67,067,400✔
429
          curr_value = (int64_t)(*((int16_t *)input + i));
67,067,400✔
430
          break;
67,067,400✔
431
        case TSDB_DATA_TYPE_INT:
297,167,846✔
432
          curr_value = (int64_t)(*((int32_t *)input + i));
297,167,846✔
433
          break;
297,167,846✔
434
        case TSDB_DATA_TYPE_BIGINT:
106,296,384✔
435
          curr_value = (int64_t)(*((int64_t *)input + i));
106,296,384✔
436
          break;
106,296,384✔
437
      }
438
      int64_t  diff = curr_value - prev_value;
563,649,457✔
439
      uint64_t zigzag_value = ZIGZAG_ENCODE(int64_t, diff);
563,649,457✔
440
      buffer |= ((zigzag_value & INT64MASK(bit)) << (bit * k + 4));
563,649,457✔
441
      i++;
563,649,457✔
442
      prev_value = curr_value;
563,649,457✔
443
    }
444

445
    // Output the encoded value to the output.
446
    if (opos + sizeof(buffer) <= byte_limit) {
88,260,291✔
447
      memcpy(output + opos, &buffer, sizeof(buffer));
87,810,376✔
448
      opos += sizeof(buffer);
87,810,376✔
449
    } else {
450
    _copy_and_exit:
449,915✔
451
      output[0] = 1;
756,581✔
452
      memcpy(output + 1, input, byte_limit - 1);
756,581✔
453
      return byte_limit;
756,581✔
454
    }
455
  }
456

457
  // set the indicator.
458
  output[0] = 0;
1,614,917✔
459
  return opos;
1,614,917✔
460
}
461

462
int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) {
16,918,103✔
463
  int32_t word_length = getWordLength(type);
16,918,103✔
464
  if (word_length < 0) {
16,916,356!
465
    return -1;
×
466
  }
467

468
  // If not compressed.
469
  if (input[0] == 1) {
16,916,356✔
470
    memcpy(output, input + 1, nelements * word_length);
6,917,273✔
471
    return nelements * word_length;
6,917,273✔
472
  }
473

474
  if (tsSIMDEnable && tsAVX512Enable && tsAVX512Supported) {
9,999,083!
475
    int32_t cnt = tsDecompressIntImpl_Hw(input, nelements, output, type);
×
476
    if (cnt >= 0) {
×
477
      return cnt;
×
478
    }
479
  }
480

481
  // Selector value: 0    1   2   3   4   5   6   7   8  9  10  11 12  13  14  15
482
  char    bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
9,999,083✔
483
  int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
9,999,083✔
484

485
  const char *ip = input + 1;
9,999,083✔
486
  char       *op = output;
9,999,083✔
487
  int32_t     count = 0;
9,999,083✔
488
  int64_t     prev_value = 0;
9,999,083✔
489

490
  while (count < nelements) {
143,769,176✔
491
    uint64_t w = *(uint64_t *)ip;
133,770,093✔
492

493
    char    selector = (char)(w & INT64MASK(4));       // selector = 4
133,770,093✔
494
    char    bit = bit_per_integer[(int32_t)selector];  // bit = 3
133,770,093✔
495
    int32_t elems = selector_to_elems[(int32_t)selector];
133,770,093✔
496

497
    switch (type) {
133,770,093!
498
      case TSDB_DATA_TYPE_BIGINT: {
80,104,967✔
499
        int64_t *out = (int64_t *)op;
80,104,967✔
500
        if (selector == 0 || selector == 1) {
80,104,967!
501
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
44,177,901✔
502
            *out = prev_value;
42,947,020✔
503
          }
504
        } else {
505
          uint64_t zigzag_value = 0;
78,874,086✔
506
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
470,068,383✔
507
            zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
391,194,297✔
508
            prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
391,194,297✔
509
            *out = prev_value;
391,194,297✔
510
          }
511
        }
512
        op = (char *)out;
80,104,967✔
513
        break;
80,104,967✔
514
      }
515
      case TSDB_DATA_TYPE_INT: {
48,205,355✔
516
        int32_t *out = (int32_t *)op;
48,205,355✔
517
        if (selector == 0 || selector == 1) {
48,205,355!
518
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
6,617,495✔
519
            *out = (int32_t)prev_value;
6,483,567✔
520
          }
521
        } else {
522
          uint64_t zigzag_value = 0;
48,071,427✔
523
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
452,701,391✔
524
            zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
404,629,964✔
525
            prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
404,629,964✔
526
            *out = (int32_t)prev_value;
404,629,964✔
527
          }
528
        }
529
        op = (char *)out;
48,205,355✔
530
        break;
48,205,355✔
531
      }
532
      case TSDB_DATA_TYPE_SMALLINT: {
3,846,813✔
533
        int16_t *out = (int16_t *)op;
3,846,813✔
534
        if (selector == 0 || selector == 1) {
3,846,813!
535
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
3,414,711✔
536
            *out = (int16_t)prev_value;
3,379,180✔
537
          }
538
        } else {
539
          uint64_t zigzag_value = 0;
3,811,282✔
540
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
36,000,359✔
541
            zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
32,189,077✔
542
            prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
32,189,077✔
543
            *out = (int16_t)prev_value;
32,189,077✔
544
          }
545
        }
546
        op = (char *)out;
3,846,813✔
547
        break;
3,846,813✔
548
      }
549
      case TSDB_DATA_TYPE_TINYINT: {
1,612,958✔
550
        int8_t *out = (int8_t *)op;
1,612,958✔
551
        if (selector == 0 || selector == 1) {
1,612,958✔
552
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
3,439,101✔
553
            *out = (int8_t)prev_value;
3,414,074✔
554
          }
555
        } else {
556
          uint64_t zigzag_value = 0;
1,587,931✔
557
          for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
20,020,306✔
558
            zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
18,432,375✔
559
            prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
18,432,375✔
560
            *out = (int8_t)prev_value;
18,432,375✔
561
          }
562
        }
563
        op = (char *)out;
1,612,958✔
564
        break;
1,612,958✔
565
      }
566
      default:
×
567
        perror("Wrong integer types.\n");
×
568
        return -1;
×
569
    }
570
    ip += LONG_BYTES;
133,770,093✔
571
  }
572

573
  return nelements * word_length;
9,999,083✔
574
}
575

576
/* ----------------------------------------------Bool Compression ---------------------------------------------- */
577
// TODO: You can also implement it using RLE method.
578
int32_t tsCompressBoolImp(const char *const input, const int32_t nelements, char *const output) {
165,150✔
579
  int32_t pos = -1;
165,150✔
580
  int32_t ele_per_byte = BITS_PER_BYTE / 2;
165,150✔
581

582
  for (int32_t i = 0; i < nelements; i++) {
25,556,232✔
583
    if (i % ele_per_byte == 0) {
25,391,113✔
584
      pos++;
6,388,834✔
585
      output[pos] = 0;
6,388,834✔
586
    }
587

588
    uint8_t t = 0;
25,391,113✔
589
    if (input[i] == 1) {
25,391,113✔
590
      t = (((uint8_t)1) << (2 * (i % ele_per_byte)));
14,146,726✔
591
      output[pos] |= t;
14,146,726✔
592
    } else if (input[i] == 0) {
11,244,387✔
593
      t = ((uint8_t)1 << (2 * (i % ele_per_byte))) - 1;
11,244,356✔
594
      /* t = (~((( uint8_t)1) << (7-i%BITS_PER_BYTE))); */
595
      output[pos] &= t;
11,244,356✔
596
    } else if (input[i] == TSDB_DATA_BOOL_NULL) {
31!
597
      t = ((uint8_t)2 << (2 * (i % ele_per_byte)));
×
598
      /* t = (~((( uint8_t)1) << (7-i%BITS_PER_BYTE))); */
599
      output[pos] |= t;
×
600
    } else {
601
      uError("Invalid compress bool value:%d", output[pos]);
31!
602
      return TSDB_CODE_INVALID_PARA;
×
603
    }
604
  }
605

606
  return pos + 1;
165,119✔
607
}
608

609
int32_t tsDecompressBoolImp(const char *const input, const int32_t nelements, char *const output) {
170,175✔
610
  int32_t ipos = -1, opos = 0;
170,175✔
611
  int32_t ele_per_byte = BITS_PER_BYTE / 2;
170,175✔
612

613
  for (int32_t i = 0; i < nelements; i++) {
25,395,086✔
614
    if (i % ele_per_byte == 0) {
25,224,911✔
615
      ipos++;
6,351,167✔
616
    }
617

618
    uint8_t ele = (input[ipos] >> (2 * (i % ele_per_byte))) & INT8MASK(2);
25,224,911✔
619
    if (ele == 1) {
25,224,911✔
620
      output[opos++] = 1;
16,894,255✔
621
    } else if (ele == 2) {
8,330,656!
622
      output[opos++] = TSDB_DATA_BOOL_NULL;
×
623
    } else {
624
      output[opos++] = 0;
8,330,656✔
625
    }
626
  }
627

628
  return nelements;
170,175✔
629
}
630
int32_t tsCompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type) {
165,149✔
631
  return tsCompressBoolImp(input, nelements, output);
165,149✔
632
}
633
int32_t tsDecompressBoolImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
170,192✔
634
                             char const type) {
635
  return tsDecompressBoolImp(input, nelements, output);
170,192✔
636
}
637

638
int32_t tsCompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type) {
296,372✔
639
  if (type == TSDB_DATA_TYPE_FLOAT) {
296,372✔
640
    return tsCompressFloatImp(input, nelements, output);
181,213✔
641
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
115,159!
642
    return tsCompressDoubleImp(input, nelements, output);
115,181✔
643
  }
644
  return TSDB_CODE_THIRDPARTY_ERROR;
×
645
}
646
int32_t tsDecompressDoubleImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
808,855✔
647
                               char const type) {
648
  if (type == TSDB_DATA_TYPE_FLOAT) {
808,855✔
649
    return tsDecompressFloatImp(input, ninput, nelements, output);
438,678✔
650
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
370,177!
651
    return tsDecompressDoubleImp(input, ninput, nelements, output);
370,233✔
652
  }
653
  return TSDB_CODE_THIRDPARTY_ERROR;
×
654
}
655
int32_t tsCompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type) {
1,553,671✔
656
  return tsCompressINTImp(input, nelements, output, type);
1,553,671✔
657
}
658
int32_t tsDecompressINTImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
4,363,233✔
659
                            const char type) {
660
  return tsDecompressINTImp(input, nelements, output, type);
4,363,233✔
661
}
662

663
#if 0
664
/* Run Length Encoding(RLE) Method */
665
int32_t tsCompressBoolRLEImp(const char *const input, const int32_t nelements, char *const output) {
666
  int32_t _pos = 0;
667

668
  for (int32_t i = 0; i < nelements;) {
669
    unsigned char counter = 1;
670
    char          num = input[i];
671

672
    for (++i; i < nelements; i++) {
673
      if (input[i] == num) {
674
        counter++;
675
        if (counter == INT8MASK(7)) {
676
          i++;
677
          break;
678
        }
679
      } else {
680
        break;
681
      }
682
    }
683

684
    // Encode the data.
685
    if (num == 1) {
686
      output[_pos++] = INT8MASK(1) | (counter << 1);
687
    } else if (num == 0) {
688
      output[_pos++] = (counter << 1) | INT8MASK(0);
689
    } else {
690
      uError("Invalid compress bool value:%d", output[_pos]);
691
      return -1;
692
    }
693
  }
694

695
  return _pos;
696
}
697

698
int32_t tsDecompressBoolRLEImp(const char *const input, const int32_t nelements, char *const output) {
699
  int32_t ipos = 0, opos = 0;
700
  while (1) {
701
    char     encode = input[ipos++];
702
    unsigned counter = (encode >> 1) & INT8MASK(7);
703
    char     value = encode & INT8MASK(1);
704

705
    memset(output + opos, value, counter);
706
    opos += counter;
707
    if (opos >= nelements) {
708
      return nelements;
709
    }
710
  }
711
}
712
#endif
713

714
/* ----------------------------------------------String Compression ---------------------------------------------- */
715
// Note: the size of the output must be larger than input_size + 1 and
716
// LZ4_compressBound(size) + 1;
717
// >= max(input_size, LZ4_compressBound(input_size)) + 1;
718
int32_t tsCompressStringImp(const char *const input, int32_t inputSize, char *const output, int32_t outputSize) {
823,127✔
719
  // Try to compress using LZ4 algorithm.
720
  const int32_t compressed_data_size = LZ4_compress_default(input, output + 1, inputSize, outputSize - 1);
823,127✔
721

722
  // If cannot compress or after compression, data becomes larger.
723
  if (compressed_data_size <= 0 || compressed_data_size > inputSize) {
823,183!
724
    /* First byte is for indicator */
725
    output[0] = 0;
741,390✔
726
    memcpy(output + 1, input, inputSize);
741,390✔
727
    return inputSize + 1;
741,390✔
728
  }
729

730
  output[0] = 1;
81,793✔
731
  return compressed_data_size + 1;
81,793✔
732
}
733

734
int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, char *const output, int32_t outputSize) {
13,006,047✔
735
  // compressedSize is the size of data after compression.
736

737
  if (input[0] == 1) {
13,006,047✔
738
    /* It is compressed by LZ4 algorithm */
739
    const int32_t decompressed_size = LZ4_decompress_safe(input + 1, output, compressedSize - 1, outputSize);
1,408,691✔
740
    if (decompressed_size < 0) {
1,408,625✔
741
      uError("Failed to decompress string with LZ4 algorithm, decompressed size:%d", decompressed_size);
6!
742
      return TSDB_CODE_THIRDPARTY_ERROR;
×
743
    }
744

745
    return decompressed_size;
1,408,619✔
746
  } else if (input[0] == 0) {
11,597,356!
747
    /* It is not compressed by LZ4 algorithm */
748
    memcpy(output, input + 1, compressedSize - 1);
11,599,387✔
749
    return compressedSize - 1;
11,599,387✔
750
  } else if (input[1] == 2) {
×
751
    uError("Invalid decompress string indicator:%d", input[0]);
×
752
    return TSDB_CODE_THIRDPARTY_ERROR;
×
753
  }
754
  return TSDB_CODE_THIRDPARTY_ERROR;
×
755
}
756

757
/* --------------------------------------------Timestamp Compression ---------------------------------------------- */
758
// TODO: Take care here, we assumes little endian encoding.
759
//
760
int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
630,958✔
761
  int32_t _pos = 1;
630,958✔
762
  int32_t longBytes = LONG_BYTES;
630,958✔
763

764
  if (nelements < 0) {
630,958!
765
    return -1;
×
766
  }
767

768
  if (nelements == 0) return 0;
630,958!
769

770
  int64_t *istream = (int64_t *)input;
630,958✔
771

772
  int64_t prev_value = istream[0];
630,958✔
773
  if (prev_value >= 0x8000000000000000) {
630,958✔
774
    uWarn("compression timestamp is over signed long long range. ts = 0x%" PRIx64 " \n", prev_value);
173!
775
    goto _exit_over;
173✔
776
  }
777
  int64_t  prev_delta = -prev_value;
630,785✔
778
  uint8_t  flags = 0, flag1 = 0, flag2 = 0;
630,785✔
779
  uint64_t dd1 = 0, dd2 = 0;
630,785✔
780

781
  for (int32_t i = 0; i < nelements; i++) {
439,632,953✔
782
    int64_t curr_value = istream[i];
439,002,168✔
783
    if (!safeInt64Add(curr_value, -prev_value)) goto _exit_over;
439,002,168!
784
    int64_t curr_delta = curr_value - prev_value;
439,002,168✔
785
    if (!safeInt64Add(curr_delta, -prev_delta)) goto _exit_over;
439,002,168!
786
    int64_t delta_of_delta = curr_delta - prev_delta;
439,002,168✔
787
    // zigzag encode the value.
788
    uint64_t zigzag_value = ZIGZAG_ENCODE(int64_t, delta_of_delta);
439,002,168✔
789
    if (i % 2 == 0) {
439,002,168✔
790
      flags = 0;
219,525,711✔
791
      dd1 = zigzag_value;
219,525,711✔
792
      if (dd1 == 0) {
219,525,711✔
793
        flag1 = 0;
190,043,916✔
794
      } else {
795
        flag1 = (uint8_t)(LONG_BYTES - BUILDIN_CLZL(dd1) / BITS_PER_BYTE);
29,481,795✔
796
      }
797
    } else {
798
      dd2 = zigzag_value;
219,476,457✔
799
      if (dd2 == 0) {
219,476,457✔
800
        flag2 = 0;
190,260,225✔
801
      } else {
802
        flag2 = (uint8_t)(LONG_BYTES - BUILDIN_CLZL(dd2) / BITS_PER_BYTE);
29,216,232✔
803
      }
804
      flags = flag1 | (flag2 << 4);
219,476,457✔
805
      // Encode the flag.
806
      if ((_pos + CHAR_BYTES - 1) >= nelements * longBytes) goto _exit_over;
219,476,457!
807
      memcpy(output + _pos, &flags, CHAR_BYTES);
219,476,457✔
808
      _pos += CHAR_BYTES;
219,476,457✔
809
      /* Here, we assume it is little endian encoding method. */
810
      // Encode dd1
811
      if (is_bigendian()) {
219,476,457!
812
        if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
×
813
        memcpy(output + _pos, (char *)(&dd1) + longBytes - flag1, flag1);
×
814
      } else {
815
        if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
219,476,457!
816
        memcpy(output + _pos, (char *)(&dd1), flag1);
219,476,457✔
817
      }
818
      _pos += flag1;
219,476,457✔
819
      // Encode dd2;
820
      if (is_bigendian()) {
219,476,457!
821
        if ((_pos + flag2 - 1) >= nelements * longBytes) goto _exit_over;
×
822
        memcpy(output + _pos, (char *)(&dd2) + longBytes - flag2, flag2);
×
823
      } else {
824
        if ((_pos + flag2 - 1) >= nelements * longBytes) goto _exit_over;
219,476,457!
825
        memcpy(output + _pos, (char *)(&dd2), flag2);
219,476,457✔
826
      }
827
      _pos += flag2;
219,476,457✔
828
    }
829
    prev_value = curr_value;
439,002,168✔
830
    prev_delta = curr_delta;
439,002,168✔
831
  }
832

833
  if (nelements % 2 == 1) {
630,785✔
834
    flag2 = 0;
39,303✔
835
    flags = flag1 | (flag2 << 4);
39,303✔
836
    // Encode the flag.
837
    if ((_pos + CHAR_BYTES - 1) >= nelements * longBytes) goto _exit_over;
39,303!
838
    memcpy(output + _pos, &flags, CHAR_BYTES);
39,303✔
839
    _pos += CHAR_BYTES;
39,303✔
840
    // Encode dd1;
841
    if (is_bigendian()) {
39,303!
842
      if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
×
843
      memcpy(output + _pos, (char *)(&dd1) + longBytes - flag1, flag1);
×
844
    } else {
845
      if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
39,303✔
846
      memcpy(output + _pos, (char *)(&dd1), flag1);
35,184✔
847
    }
848
    _pos += flag1;
35,184✔
849
  }
850

851
  output[0] = 1;  // Means the string is compressed
626,666✔
852
  return _pos;
626,666✔
853

854
_exit_over:
4,292✔
855
  output[0] = 0;  // Means the string is not compressed
4,292✔
856
  memcpy(output + 1, input, nelements * longBytes);
4,292✔
857
  return nelements * longBytes + 1;
4,292✔
858
}
859

860
int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
5,530,869✔
861
  int64_t longBytes = LONG_BYTES;
5,530,869✔
862

863
  if (nelements < 0) return -1;
5,530,869!
864
  if (nelements == 0) return 0;
5,530,869!
865

866
  if (input[0] == 0) {
5,530,869✔
867
    memcpy(output, input + 1, nelements * longBytes);
721,392✔
868
    return nelements * longBytes;
721,392✔
869
  } else if (input[0] == 1) {  // Decompress
4,809,477!
870
    if (tsSIMDEnable && tsAVX512Enable && tsAVX512Supported) {
4,809,942!
871
      int32_t cnt = tsDecompressTimestampAvx512(input, nelements, output, false);
×
872
      if (cnt >= 0) {
×
873
        return cnt;
×
874
      }
875
    }
876

877
    int64_t *ostream = (int64_t *)output;
4,809,942✔
878

879
    int32_t ipos = 1, opos = 0;
4,809,942✔
880
    int8_t  nbytes = 0;
4,809,942✔
881
    int64_t prev_value = 0;
4,809,942✔
882
    int64_t prev_delta = 0;
4,809,942✔
883
    int64_t delta_of_delta = 0;
4,809,942✔
884

885
    while (1) {
1,095,348,947✔
886
      uint8_t flags = input[ipos++];
1,100,158,889✔
887
      // Decode dd1
888
      uint64_t dd1 = 0;
1,100,158,889✔
889
      nbytes = flags & INT8MASK(4);
1,100,158,889✔
890
      if (nbytes == 0) {
1,100,158,889✔
891
        delta_of_delta = 0;
1,044,044,894✔
892
      } else {
893
        if (is_bigendian()) {
56,113,995!
894
          memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes);
×
895
        } else {
896
          memcpy(&dd1, input + ipos, nbytes);
56,113,995✔
897
        }
898
        delta_of_delta = ZIGZAG_DECODE(int64_t, dd1);
56,113,995✔
899
      }
900

901
      ipos += nbytes;
1,100,158,889✔
902
      if (opos == 0) {
1,100,158,889✔
903
        prev_value = delta_of_delta;
4,811,169✔
904
        prev_delta = 0;
4,811,169✔
905
        ostream[opos++] = delta_of_delta;
4,811,169✔
906
      } else {
907
        prev_delta = delta_of_delta + prev_delta;
1,095,347,720✔
908
        prev_value = prev_value + prev_delta;
1,095,347,720✔
909
        ostream[opos++] = prev_value;
1,095,347,720✔
910
      }
911
      if (opos == nelements) return nelements * longBytes;
1,102,934,227✔
912

913
      // Decode dd2
914
      uint64_t dd2 = 0;
1,098,124,285✔
915
      nbytes = (flags >> 4) & INT8MASK(4);
1,098,124,285✔
916
      if (nbytes == 0) {
1,098,124,285✔
917
        delta_of_delta = 0;
1,045,394,781✔
918
      } else {
919
        if (is_bigendian()) {
52,729,504!
920
          memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes);
×
921
        } else {
922
          memcpy(&dd2, input + ipos, nbytes);
52,729,504✔
923
        }
924
        // zigzag_decoding
925
        delta_of_delta = ZIGZAG_DECODE(int64_t, dd2);
52,729,504✔
926
      }
927
      ipos += nbytes;
1,098,124,285✔
928
      prev_delta = delta_of_delta + prev_delta;
1,098,124,285✔
929
      prev_value = prev_value + prev_delta;
1,098,124,285✔
930
      ostream[opos++] = prev_value;
1,098,124,285✔
931
      if (opos == nelements) return nelements * longBytes;
1,098,124,285✔
932
    }
933
  }
934

935
  return nelements * longBytes;
×
936
}
937

938
int32_t tsCompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type) {
×
939
  int32_t bytes = tDataTypes[type].bytes * nelements;
×
940
  output[0] = 0;
×
941
  memcpy(output + 1, input, bytes);
×
942
  return bytes + 1;
×
943
}
944
int32_t tsDecompressPlain2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
×
945
                           const char type) {
946
  int32_t bytes = tDataTypes[type].bytes * nelements;
×
947
  memcpy(output, input + 1, bytes);
×
948
  return bytes;
×
949
}
950
int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type) {
630,832✔
951
  return tsCompressTimestampImp(input, nelements, output);
630,832✔
952
}
953
int32_t tsDecompressTimestampImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
5,530,927✔
954
                                  const char type) {
955
  return tsDecompressTimestampImp(input, nelements, output);
5,530,927✔
956
}
957

958
/* --------------------------------------------Double Compression ---------------------------------------------- */
959
void encodeDoubleValue(uint64_t diff, uint8_t flag, char *const output, int32_t *const pos) {
59,765,872✔
960
  int32_t longBytes = LONG_BYTES;
59,765,872✔
961

962
  uint8_t nbytes = (flag & INT8MASK(3)) + 1;
59,765,872✔
963
  int32_t nshift = (longBytes * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
59,765,872✔
964
  diff >>= nshift;
59,765,872✔
965

966
  while (nbytes) {
359,163,708✔
967
    output[(*pos)++] = (int8_t)(diff & INT64MASK(8));
299,397,836✔
968
    diff >>= BITS_PER_BYTE;
299,397,836✔
969
    nbytes--;
299,397,836✔
970
  }
971
}
59,765,872✔
972

973
int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, char *const output) {
115,179✔
974
  int32_t byte_limit = nelements * DOUBLE_BYTES + 1;
115,179✔
975
  int32_t opos = 1;
115,179✔
976

977
  uint64_t prev_value = 0;
115,179✔
978
  uint64_t prev_diff = 0;
115,179✔
979
  uint8_t  prev_flag = 0;
115,179✔
980

981
  double *istream = (double *)input;
115,179✔
982

983
  // Main loop
984
  for (int32_t i = 0; i < nelements; i++) {
60,239,988!
985
    union {
986
      double   real;
987
      uint64_t bits;
988
    } curr;
989

990
    curr.real = istream[i];
60,439,964✔
991

992
    // Here we assume the next value is the same as previous one.
993
    uint64_t predicted = prev_value;
60,439,964✔
994
    uint64_t diff = curr.bits ^ predicted;
60,439,964✔
995

996
    int32_t leading_zeros = LONG_BYTES * BITS_PER_BYTE;
60,439,964✔
997
    int32_t trailing_zeros = leading_zeros;
60,439,964✔
998

999
    if (diff) {
60,439,964✔
1000
      trailing_zeros = BUILDIN_CTZL(diff);
54,301,327✔
1001
      leading_zeros = BUILDIN_CLZL(diff);
54,301,327✔
1002
    }
1003

1004
    uint8_t nbytes = 0;
60,439,964✔
1005
    uint8_t flag;
1006

1007
    if (trailing_zeros > leading_zeros) {
60,439,964✔
1008
      nbytes = (uint8_t)(LONG_BYTES - trailing_zeros / BITS_PER_BYTE);
27,331,009✔
1009

1010
      if (nbytes > 0) nbytes--;
27,331,009!
1011
      flag = ((uint8_t)1 << 3) | nbytes;
27,331,009✔
1012
    } else {
1013
      nbytes = (uint8_t)(LONG_BYTES - leading_zeros / BITS_PER_BYTE);
33,108,955✔
1014
      if (nbytes > 0) nbytes--;
33,108,955✔
1015
      flag = nbytes;
33,108,955✔
1016
    }
1017

1018
    if (i % 2 == 0) {
60,439,964✔
1019
      prev_diff = diff;
30,233,260✔
1020
      prev_flag = flag;
30,233,260✔
1021
    } else {
1022
      int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
30,206,704✔
1023
      int32_t nbyte2 = (flag & INT8MASK(3)) + 1;
30,206,704✔
1024
      if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
30,206,704✔
1025
        uint8_t flags = prev_flag | (flag << 4);
30,122,549✔
1026
        output[opos++] = flags;
30,122,549✔
1027
        encodeDoubleValue(prev_diff, prev_flag, output, &opos);
30,122,549✔
1028
        encodeDoubleValue(diff, flag, output, &opos);
30,154,278✔
1029
      } else {
1030
        output[0] = 1;
84,155✔
1031
        memcpy(output + 1, input, byte_limit - 1);
84,155✔
1032
        return byte_limit;
84,155✔
1033
      }
1034
    }
1035
    prev_value = curr.bits;
60,124,809✔
1036
  }
1037

1038
  if (nelements % 2) {
×
1039
    int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
7,296✔
1040
    int32_t nbyte2 = 1;
7,296✔
1041
    if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
7,296✔
1042
      uint8_t flags = prev_flag;
3,317✔
1043
      output[opos++] = flags;
3,317✔
1044
      encodeDoubleValue(prev_diff, prev_flag, output, &opos);
3,317✔
1045
      encodeDoubleValue(0ul, 0, output, &opos);
3,318✔
1046
    } else {
1047
      output[0] = 1;
3,979✔
1048
      memcpy(output + 1, input, byte_limit - 1);
3,979✔
1049
      return byte_limit;
3,979✔
1050
    }
1051
  }
1052

1053
  output[0] = 0;
101,761✔
1054
  return opos;
101,761✔
1055
}
1056

1057
FORCE_INLINE uint64_t decodeDoubleValue(const char *const input, int32_t *const ipos, uint8_t flag) {
1058
  int32_t longBytes = LONG_BYTES;
48,520,516✔
1059

1060
  uint64_t diff = 0ul;
48,520,516✔
1061
  int32_t  nbytes = (flag & 0x7) + 1;
48,520,516✔
1062
  for (int32_t i = 0; i < nbytes; i++) {
167,324,135✔
1063
    diff |= (((uint64_t)0xff & input[(*ipos)++]) << BITS_PER_BYTE * i);
118,803,619✔
1064
  }
1065
  int32_t shift_width = (longBytes * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
48,520,516✔
1066
  diff <<= shift_width;
48,520,516✔
1067

1068
  return diff;
48,520,516✔
1069
}
1070

1071
static int32_t tsDecompressDoubleImpHelper(const char *input, int32_t nelements, char *output) {
82,901✔
1072
  double  *ostream = (double *)output;
82,901✔
1073
  uint8_t  flags = 0;
82,901✔
1074
  int32_t  ipos = 0;
82,901✔
1075
  int32_t  opos = 0;
82,901✔
1076
  uint64_t diff = 0;
82,901✔
1077
  union {
1078
    uint64_t bits;
1079
    double   real;
1080
  } curr;
1081

1082
  curr.bits = 0;
82,901✔
1083

1084
  for (int32_t i = 0; i < nelements; i++) {
48,603,417✔
1085
    if ((i & 0x01) == 0) {
48,520,516✔
1086
      flags = input[ipos++];
24,358,187✔
1087
    }
1088

1089
    diff = decodeDoubleValue(input, &ipos, flags & INT8MASK(4));
48,520,516✔
1090
    flags >>= 4;
48,520,516✔
1091
    curr.bits ^= diff;
48,520,516✔
1092

1093
    ostream[opos++] = curr.real;
48,520,516✔
1094
  }
1095

1096
  return nelements * DOUBLE_BYTES;
82,901✔
1097
}
1098

1099
int32_t tsDecompressDoubleImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output) {
370,187✔
1100
  // return the result directly if there is no compression
1101
  if (input[0] == 1) {
370,187✔
1102
    memcpy(output, input + 1, nelements * DOUBLE_BYTES);
233,033✔
1103
    return nelements * DOUBLE_BYTES;
233,033✔
1104
  }
1105

1106
  // use AVX2 implementation when allowed and the compression ratio is not high
1107
  double compressRatio = 1.0 * nelements * DOUBLE_BYTES / ninput;
137,154✔
1108
  if (tsSIMDEnable && tsAVX2Supported && compressRatio < 2) {
137,154!
1109
    int32_t cnt = tsDecompressDoubleImpAvx2(input + 1, nelements, output);
54,301✔
1110
    if (cnt >= 0) {
54,303!
1111
      return cnt;
54,303✔
1112
    }
1113
  }
1114

1115
  // use implementation without SIMD instructions by default
1116
  return tsDecompressDoubleImpHelper(input + 1, nelements, output);
82,853✔
1117
}
1118

1119
/* --------------------------------------------Float Compression ---------------------------------------------- */
1120
void encodeFloatValue(uint32_t diff, uint8_t flag, char *const output, int32_t *const pos) {
86,062,382✔
1121
  uint8_t nbytes = (flag & INT8MASK(3)) + 1;
86,062,382✔
1122
  int32_t nshift = (FLOAT_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
86,062,382✔
1123
  diff >>= nshift;
86,062,382✔
1124

1125
  while (nbytes) {
340,594,840✔
1126
    output[(*pos)++] = (int8_t)(diff & INT32MASK(8));
254,532,458✔
1127
    diff >>= BITS_PER_BYTE;
254,532,458✔
1128
    nbytes--;
254,532,458✔
1129
  }
1130
}
86,062,382✔
1131

1132
int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, char *const output) {
181,211✔
1133
  float  *istream = (float *)input;
181,211✔
1134
  int32_t byte_limit = nelements * FLOAT_BYTES + 1;
181,211✔
1135
  int32_t opos = 1;
181,211✔
1136

1137
  uint32_t prev_value = 0;
181,211✔
1138
  uint32_t prev_diff = 0;
181,211✔
1139
  uint8_t  prev_flag = 0;
181,211✔
1140

1141
  // Main loop
1142
  for (int32_t i = 0; i < nelements; i++) {
86,374,076✔
1143
    union {
1144
      float    real;
1145
      uint32_t bits;
1146
    } curr;
1147

1148
    curr.real = istream[i];
86,307,349✔
1149

1150
    // Here we assume the next value is the same as previous one.
1151
    uint32_t predicted = prev_value;
86,307,349✔
1152
    uint32_t diff = curr.bits ^ predicted;
86,307,349✔
1153

1154
    int32_t clz = FLOAT_BYTES * BITS_PER_BYTE;
86,307,349✔
1155
    int32_t ctz = clz;
86,307,349✔
1156

1157
    if (diff) {
86,307,349✔
1158
      ctz = BUILDIN_CTZ(diff);
80,988,720✔
1159
      clz = BUILDIN_CLZ(diff);
80,988,720✔
1160
    }
1161

1162
    uint8_t nbytes = 0;
86,307,349✔
1163
    uint8_t flag;
1164

1165
    if (ctz > clz) {
86,307,349✔
1166
      nbytes = (uint8_t)(FLOAT_BYTES - ctz / BITS_PER_BYTE);
11,197,928✔
1167

1168
      if (nbytes > 0) nbytes--;
11,197,928✔
1169
      flag = ((uint8_t)1 << 3) | nbytes;
11,197,928✔
1170
    } else {
1171
      nbytes = (uint8_t)(FLOAT_BYTES - clz / BITS_PER_BYTE);
75,109,421✔
1172
      if (nbytes > 0) nbytes--;
75,109,421✔
1173
      flag = nbytes;
75,109,421✔
1174
    }
1175

1176
    if (i % 2 == 0) {
86,307,349✔
1177
      prev_diff = diff;
43,229,453✔
1178
      prev_flag = flag;
43,229,453✔
1179
    } else {
1180
      int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
43,077,896✔
1181
      int32_t nbyte2 = (flag & INT8MASK(3)) + 1;
43,077,896✔
1182
      if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
43,077,896!
1183
        uint8_t flags = prev_flag | (flag << 4);
43,239,877✔
1184
        output[opos++] = flags;
43,239,877✔
1185
        encodeFloatValue(prev_diff, prev_flag, output, &opos);
43,239,877✔
1186
        encodeFloatValue(diff, flag, output, &opos);
43,213,159✔
1187
      } else {
1188
        output[0] = 1;
×
1189
        memcpy(output + 1, input, byte_limit - 1);
×
1190
        return byte_limit;
×
1191
      }
1192
    }
1193
    prev_value = curr.bits;
86,192,865✔
1194
  }
1195

1196
  if (nelements % 2) {
66,727✔
1197
    int32_t nbyte1 = (prev_flag & INT8MASK(3)) + 1;
4,799✔
1198
    int32_t nbyte2 = 1;
4,799✔
1199
    if (opos + 1 + nbyte1 + nbyte2 <= byte_limit) {
4,799✔
1200
      uint8_t flags = prev_flag;
1,753✔
1201
      output[opos++] = flags;
1,753✔
1202
      encodeFloatValue(prev_diff, prev_flag, output, &opos);
1,753✔
1203
      encodeFloatValue(0, 0, output, &opos);
1,752✔
1204
    } else {
1205
      output[0] = 1;
3,046✔
1206
      memcpy(output + 1, input, byte_limit - 1);
3,046✔
1207
      return byte_limit;
3,046✔
1208
    }
1209
  }
1210

1211
  output[0] = 0;
161,455✔
1212
  return opos;
161,455✔
1213
}
1214

1215
uint32_t decodeFloatValue(const char *const input, int32_t *const ipos, uint8_t flag) {
2,700,960✔
1216
  uint32_t diff = 0ul;
2,700,960✔
1217
  int32_t  nbytes = (flag & INT8MASK(3)) + 1;
2,700,960✔
1218
  for (int32_t i = 0; i < nbytes; i++) {
5,407,223✔
1219
    diff = diff | ((INT32MASK(8) & input[(*ipos)++]) << BITS_PER_BYTE * i);
2,706,263✔
1220
  }
1221
  int32_t shift_width = (FLOAT_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
2,700,960✔
1222
  diff <<= shift_width;
2,700,960✔
1223

1224
  return diff;
2,700,960✔
1225
}
1226

1227
static int32_t tsDecompressFloatImpHelper(const char *input, int32_t nelements, char *output) {
1,266✔
1228
  float   *ostream = (float *)output;
1,266✔
1229
  uint8_t  flags = 0;
1,266✔
1230
  int32_t  ipos = 0;
1,266✔
1231
  int32_t  opos = 0;
1,266✔
1232
  uint32_t diff = 0;
1,266✔
1233
  union {
1234
    uint32_t bits;
1235
    float    real;
1236
  } curr;
1237

1238
  curr.bits = 0;
1,266✔
1239

1240
  for (int32_t i = 0; i < nelements; i++) {
2,702,254✔
1241
    if (i % 2 == 0) {
2,700,973✔
1242
      flags = input[ipos++];
1,351,006✔
1243
    }
1244

1245
    diff = decodeFloatValue(input, &ipos, flags & INT8MASK(4));
2,700,973✔
1246
    flags >>= 4;
2,700,988✔
1247
    curr.bits ^= diff;
2,700,988✔
1248

1249
    ostream[opos++] = curr.real;
2,700,988✔
1250
  }
1251

1252
  return nelements * FLOAT_BYTES;
1,281✔
1253
}
1254

1255
int32_t tsDecompressFloatImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output) {
438,643✔
1256
  if (input[0] == 1) {
438,643✔
1257
    memcpy(output, input + 1, nelements * FLOAT_BYTES);
240,348✔
1258
    return nelements * FLOAT_BYTES;
240,348✔
1259
  }
1260

1261
  // use AVX2 implementation when allowed and the compression ratio is not high
1262
  double compressRatio = 1.0 * nelements * FLOAT_BYTES / ninput;
198,295✔
1263
  if (tsSIMDEnable && tsAVX2Supported && compressRatio < 2) {
198,295!
1264
    int32_t cnt =  tsDecompressFloatImpAvx2(input + 1, nelements, output);
197,118✔
1265
    if (cnt >= 0) {
197,133!
1266
      return cnt;
197,137✔
1267
    }
1268
  }
1269

1270
  // use implementation without SIMD instructions by default
1271
  return tsDecompressFloatImpHelper(input + 1, nelements, output);
1,173✔
1272
}
1273

1274
//
1275
//   ----------  float double lossy  -----------
1276
//
1277
int32_t tsCompressFloatLossyImp(const char *input, const int32_t nelements, char *const output) {
×
1278
  // compress with sz
1279
  int32_t       compressedSize = tdszCompress(SZ_FLOAT, input, nelements, output + 1);
×
1280
  unsigned char algo = ALGO_SZ_LOSSY << 1;
×
1281
  if (compressedSize == 0 || compressedSize >= nelements * sizeof(float)) {
×
1282
    // compressed error or large than original
1283
    output[0] = MODE_NOCOMPRESS | algo;
×
1284
    memcpy(output + 1, input, nelements * sizeof(float));
×
1285
    compressedSize = 1 + nelements * sizeof(float);
×
1286
  } else {
1287
    // compressed successfully
1288
    output[0] = MODE_COMPRESS | algo;
×
1289
    compressedSize += 1;
×
1290
  }
1291

1292
  return compressedSize;
×
1293
}
1294

1295
int32_t tsDecompressFloatLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
×
1296
                                  char *const output) {
1297
  int32_t decompressedSize = 0;
×
1298
  if (HEAD_MODE(input[0]) == MODE_NOCOMPRESS) {
×
1299
    // orginal so memcpy directly
1300
    decompressedSize = nelements * sizeof(float);
×
1301
    memcpy(output, input + 1, decompressedSize);
×
1302

1303
    return decompressedSize;
×
1304
  }
1305

1306
  // decompressed with sz
1307
  return tdszDecompress(SZ_FLOAT, input + 1, compressedSize - 1, nelements, output);
×
1308
}
1309

1310
int32_t tsCompressDoubleLossyImp(const char *input, const int32_t nelements, char *const output) {
×
1311
  // compress with sz
1312
  int32_t       compressedSize = tdszCompress(SZ_DOUBLE, input, nelements, output + 1);
×
1313
  unsigned char algo = ALGO_SZ_LOSSY << 1;
×
1314
  if (compressedSize == 0 || compressedSize >= nelements * sizeof(double)) {
×
1315
    // compressed error or large than original
1316
    output[0] = MODE_NOCOMPRESS | algo;
×
1317
    memcpy(output + 1, input, nelements * sizeof(double));
×
1318
    compressedSize = 1 + nelements * sizeof(double);
×
1319
  } else {
1320
    // compressed successfully
1321
    output[0] = MODE_COMPRESS | algo;
×
1322
    compressedSize += 1;
×
1323
  }
1324

1325
  return compressedSize;
×
1326
}
1327

1328
int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, const int32_t nelements,
×
1329
                                   char *const output) {
1330
  int32_t decompressedSize = 0;
×
1331
  if (HEAD_MODE(input[0]) == MODE_NOCOMPRESS) {
×
1332
    // orginal so memcpy directly
1333
    decompressedSize = nelements * sizeof(double);
×
1334
    memcpy(output, input + 1, decompressedSize);
×
1335

1336
    return decompressedSize;
×
1337
  }
1338

1339
  // decompressed with sz
1340
  return tdszDecompress(SZ_DOUBLE, input + 1, compressedSize - 1, nelements, output);
×
1341
}
1342

1343
/*************************************************************************
1344
 *                  REGULAR COMPRESSION
1345
 *************************************************************************/
1346
// Timestamp =====================================================
1347
int32_t tsCompressTimestamp(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
50✔
1348
                            int32_t nBuf) {
1349
  if (cmprAlg == ONE_STAGE_COMP) {
50!
1350
    return tsCompressTimestampImp(pIn, nEle, pOut);
×
1351
  } else if (cmprAlg == TWO_STAGE_COMP) {
50!
1352
    int32_t len = tsCompressTimestampImp(pIn, nEle, pBuf);
50✔
1353
    return tsCompressStringImp(pBuf, len, pOut, nOut);
50✔
1354
  } else {
1355
    // tDataTypeCompress[TSDB_DATA_TYPE_TIMESTAMP].compFunc(pIn, nIn, nEle, pOut, nOut, );
1356
  }
1357
  return 0;
×
1358
}
1359

1360
int32_t tsDecompressTimestamp(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
2✔
1361
                              void *pBuf, int32_t nBuf) {
1362
  if (cmprAlg == ONE_STAGE_COMP) {
2!
1363
    return tsDecompressTimestampImp(pIn, nEle, pOut);
×
1364
  } else if (cmprAlg == TWO_STAGE_COMP) {
2!
1365
    if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
2!
1366
    return tsDecompressTimestampImp(pBuf, nEle, pOut);
2✔
1367
  } else {
1368
    return -1;
×
1369
  }
1370
}
1371

1372
// Float =====================================================
1373
int32_t tsCompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
×
1374
                        int32_t nBuf) {
1375
  // lossy mode
1376
  if (lossyFloat) {
×
1377
    return tsCompressFloatLossyImp(pIn, nEle, pOut);
×
1378
    // lossless mode
1379
  } else {
1380
    if (cmprAlg == ONE_STAGE_COMP) {
×
1381
      return tsCompressFloatImp(pIn, nEle, pOut);
×
1382
    } else if (cmprAlg == TWO_STAGE_COMP) {
×
1383
      int32_t len = tsCompressFloatImp(pIn, nEle, pBuf);
×
1384
      return tsCompressStringImp(pBuf, len, pOut, nOut);
×
1385
    } else {
1386
      return TSDB_CODE_INVALID_PARA;
×
1387
    }
1388
  }
1389
}
1390

1391
int32_t tsDecompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
×
1392
                          int32_t nBuf) {
1393
  if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
×
1394
    // decompress lossy
1395
    return tsDecompressFloatLossyImp(pIn, nIn, nEle, pOut);
×
1396
  } else {
1397
    // decompress lossless
1398
    if (cmprAlg == ONE_STAGE_COMP) {
×
1399
      return tsDecompressFloatImp(pIn, nIn, nEle, pOut);
×
1400
    } else if (cmprAlg == TWO_STAGE_COMP) {
×
1401
      int32_t bufLen = tsDecompressStringImp(pIn, nIn, pBuf, nBuf);
×
1402
      if (bufLen < 0) return -1;
×
1403
      return tsDecompressFloatImp(pBuf, bufLen, nEle, pOut);
×
1404
    } else {
1405
      return TSDB_CODE_INVALID_PARA;
×
1406
    }
1407
  }
1408
}
1409

1410
// Double =====================================================
1411
int32_t tsCompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
×
1412
                         int32_t nBuf) {
1413
  if (lossyDouble) {
×
1414
    // lossy mode
1415
    return tsCompressDoubleLossyImp(pIn, nEle, pOut);
×
1416
  } else {
1417
    // lossless mode
1418
    if (cmprAlg == ONE_STAGE_COMP) {
×
1419
      return tsCompressDoubleImp(pIn, nEle, pOut);
×
1420
    } else if (cmprAlg == TWO_STAGE_COMP) {
×
1421
      int32_t len = tsCompressDoubleImp(pIn, nEle, pBuf);
×
1422
      return tsCompressStringImp(pBuf, len, pOut, nOut);
×
1423
    } else {
1424
      return TSDB_CODE_INVALID_PARA;
×
1425
    }
1426
  }
1427
}
1428

1429
int32_t tsDecompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
×
1430
                           int32_t nBuf) {
1431
  if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
×
1432
    // decompress lossy
1433
    return tsDecompressDoubleLossyImp(pIn, nIn, nEle, pOut);
×
1434
  } else {
1435
    // decompress lossless
1436
    if (cmprAlg == ONE_STAGE_COMP) {
×
1437
      return tsDecompressDoubleImp(pIn, nIn, nEle, pOut);
×
1438
    } else if (cmprAlg == TWO_STAGE_COMP) {
×
1439
      int32_t bufLen = tsDecompressStringImp(pIn, nIn, pBuf, nBuf);
×
1440
      if (bufLen < 0) return -1;
×
1441
      return tsDecompressDoubleImp(pBuf, bufLen, nEle, pOut);
×
1442
    } else {
1443
      return TSDB_CODE_INVALID_PARA;
×
1444
    }
1445
  }
1446
}
1447

1448
// Binary =====================================================
1449
int32_t tsCompressString(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
3,456✔
1450
                         int32_t nBuf) {
1451
  return tsCompressStringImp(pIn, nIn, pOut, nOut);
3,456✔
1452
}
1453

1454
int32_t tsDecompressString(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
446,987✔
1455
                           int32_t nBuf) {
1456
  return tsDecompressStringImp(pIn, nIn, pOut, nOut);
446,987✔
1457
}
1458

1459
// Bool =====================================================
1460
int32_t tsCompressBool(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
×
1461
                       int32_t nBuf) {
1462
  if (cmprAlg == ONE_STAGE_COMP) {
×
1463
    return tsCompressBoolImp(pIn, nEle, pOut);
×
1464
  } else if (cmprAlg == TWO_STAGE_COMP) {
×
1465
    int32_t len = tsCompressBoolImp(pIn, nEle, pBuf);
×
1466
    if (len < 0) {
×
1467
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1468
    }
1469
    return tsCompressStringImp(pBuf, len, pOut, nOut);
×
1470
  } else {
1471
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1472
  }
1473
}
1474

1475
int32_t tsDecompressBool(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
×
1476
                         int32_t nBuf) {
1477
  int32_t code = 0;
×
1478
  if (cmprAlg == ONE_STAGE_COMP) {
×
1479
    return tsDecompressBoolImp(pIn, nEle, pOut);
×
1480
  } else if (cmprAlg == TWO_STAGE_COMP) {
×
1481
    if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code;
×
1482
    return tsDecompressBoolImp(pBuf, nEle, pOut);
×
1483
  } else {
1484
    return TSDB_CODE_INVALID_PARA;
×
1485
  }
1486
}
1487

1488
// Tinyint =====================================================
1489
int32_t tsCompressTinyint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
50✔
1490
                          int32_t nBuf) {
1491
  if (cmprAlg == ONE_STAGE_COMP) {
50!
1492
    return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT);
×
1493
  } else if (cmprAlg == TWO_STAGE_COMP) {
50!
1494
    int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_TINYINT);
50✔
1495
    if (len < 0) {
50!
1496
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1497
    }
1498
    return tsCompressStringImp(pBuf, len, pOut, nOut);
50✔
1499
  } else {
1500
    return TSDB_CODE_INVALID_PARA;
×
1501
  }
1502
}
1503

1504
int32_t tsDecompressTinyint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
2✔
1505
                            int32_t nBuf) {
1506
  int32_t code = 0;
2✔
1507
  if (cmprAlg == ONE_STAGE_COMP) {
2!
1508
    return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_TINYINT);
×
1509
  } else if (cmprAlg == TWO_STAGE_COMP) {
2!
1510
    if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code;
2!
1511
    return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_TINYINT);
2✔
1512
  } else {
1513
    return TSDB_CODE_INVALID_PARA;
×
1514
  }
1515
}
1516

1517
// Smallint =====================================================
1518
int32_t tsCompressSmallint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
50✔
1519
                           int32_t nBuf) {
1520
  if (cmprAlg == ONE_STAGE_COMP) {
50!
1521
    return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT);
×
1522
  } else if (cmprAlg == TWO_STAGE_COMP) {
50!
1523
    int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_SMALLINT);
50✔
1524
    if (len < 0) {
50!
1525
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1526
    }
1527
    return tsCompressStringImp(pBuf, len, pOut, nOut);
50✔
1528
  } else {
1529
    return TSDB_CODE_INVALID_PARA;
×
1530
  }
1531
}
1532

1533
int32_t tsDecompressSmallint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg,
2✔
1534
                             void *pBuf, int32_t nBuf) {
1535
  int32_t code = 0;
2✔
1536
  if (cmprAlg == ONE_STAGE_COMP) {
2!
1537
    return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_SMALLINT);
×
1538
  } else if (cmprAlg == TWO_STAGE_COMP) {
2!
1539
    if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code;
2!
1540
    return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_SMALLINT);
2✔
1541
  } else {
1542
    return TSDB_CODE_INVALID_PARA;
×
1543
  }
1544
}
1545

1546
// Int =====================================================
1547
int32_t tsCompressInt(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
16,612✔
1548
                      int32_t nBuf) {
1549
  if (cmprAlg == ONE_STAGE_COMP) {
16,612!
1550
    return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT);
×
1551
  } else if (cmprAlg == TWO_STAGE_COMP) {
16,612!
1552
    int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_INT);
16,612✔
1553
    if (len < 0) {
16,614!
1554
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1555
    }
1556
    return tsCompressStringImp(pBuf, len, pOut, nOut);
16,614✔
1557
  } else {
1558
    return TSDB_CODE_INVALID_PARA;
×
1559
  }
1560
}
1561

1562
int32_t tsDecompressInt(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
1,218,819✔
1563
                        int32_t nBuf) {
1564
  int32_t code = 0;
1,218,819✔
1565
  if (cmprAlg == ONE_STAGE_COMP) {
1,218,819!
1566
    return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_INT);
×
1567
  } else if (cmprAlg == TWO_STAGE_COMP) {
1,218,819!
1568
    if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code;
1,218,866!
1569
    return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_INT);
1,218,909✔
1570
  } else {
1571
    return TSDB_CODE_INVALID_PARA;
×
1572
  }
1573
}
1574

1575
// Bigint =====================================================
1576
int32_t tsCompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
802,846✔
1577
                         int32_t nBuf) {
1578
  if (cmprAlg == ONE_STAGE_COMP) {
802,846!
1579
    return tsCompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
×
1580
  } else if (cmprAlg == TWO_STAGE_COMP) {
802,846!
1581
    int32_t len = tsCompressINTImp(pIn, nEle, pBuf, TSDB_DATA_TYPE_BIGINT);
802,864✔
1582
    if (len < 0) {
802,931!
1583
      return TSDB_CODE_THIRDPARTY_ERROR;
×
1584
    }
1585
    return tsCompressStringImp(pBuf, len, pOut, nOut);
802,931✔
1586
  } else {
1587
    return TSDB_CODE_INVALID_PARA;
×
1588
  }
1589
}
1590

1591
int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
11,345,406✔
1592
                           int32_t nBuf) {
1593
  int32_t code = 0;
11,345,406✔
1594
  if (cmprAlg == ONE_STAGE_COMP) {
11,345,406!
1595
    return tsDecompressINTImp(pIn, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
×
1596
  } else if (cmprAlg == TWO_STAGE_COMP) {
11,345,406!
1597
    if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code;
11,345,625!
1598
    return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_BIGINT);
11,345,792✔
1599
  } else {
1600
    return TSDB_CODE_INVALID_PARA;
×
1601
  }
1602
}
1603

1604
#define FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, alg, pBuf, nBuf, type, compress)                               \
1605
  do {                                                                                                                \
1606
    DEFINE_VAR(alg)                                                                                                   \
1607
    if (l1 != L1_DISABLED && l2 == L2_DISABLED) {                                                                     \
1608
      if (compress) {                                                                                                 \
1609
        uTrace("encode:%s, compress:%s, level:%s, type:%s", compressL1Dict[l1].name, "disabled", "disabled",          \
1610
               tDataTypes[type].name);                                                                                \
1611
        return compressL1Dict[l1].comprFn(pIn, nEle, pOut, type);                                                     \
1612
      } else {                                                                                                        \
1613
        uTrace("dencode:%s, compress:%s, level:%s, type:%s", compressL1Dict[l1].name, "disabled", "disabled",         \
1614
               tDataTypes[type].name);                                                                                \
1615
        return compressL1Dict[l1].decomprFn(pIn, nIn, nEle, pOut, type);                                              \
1616
      }                                                                                                               \
1617
    } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) {                                                              \
1618
      if (compress) {                                                                                                 \
1619
        uTrace("encode:%s, compress:%s, level:%d, type:%s, l1:%d", compressL1Dict[l1].name, compressL2Dict[l2].name,  \
1620
               lvl, tDataTypes[type].name, l1);                                                                       \
1621
        int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type);                                              \
1622
        if (len < 0) {                                                                                                \
1623
          return len;                                                                                                 \
1624
        }                                                                                                             \
1625
        int8_t alvl = tsGetCompressL2Level(l2, lvl);                                                                  \
1626
        return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, alvl);                                         \
1627
      } else {                                                                                                        \
1628
        uTrace("dencode:%s, decompress:%s, level:%d, type:%s", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl, \
1629
               tDataTypes[type].name);                                                                                \
1630
        int32_t bufLen = compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type);                                    \
1631
        if (bufLen < 0) return -1;                                                                                    \
1632
        return compressL1Dict[l1].decomprFn(pBuf, bufLen, nEle, pOut, type);                                          \
1633
      }                                                                                                               \
1634
    } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) {                                                              \
1635
      if (compress) {                                                                                                 \
1636
        uTrace("encode:%s, compress:%s, level:%d, type:%s", "disabled", "disable", lvl, tDataTypes[type].name);       \
1637
        int8_t alvl = tsGetCompressL2Level(l2, lvl);                                                                  \
1638
        return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, alvl);                                          \
1639
      } else {                                                                                                        \
1640
        uTrace("dencode:%s, decompress:%s, level:%d, type:%s", "disabled", compressL2Dict[l1].name, lvl,              \
1641
               tDataTypes[type].name);                                                                                \
1642
        return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, type);                                              \
1643
      }                                                                                                               \
1644
    }                                                                                                                 \
1645
    return TSDB_CODE_INVALID_PARA;                                                                                    \
1646
  } while (1)
1647

1648
/*************************************************************************
1649
 *                  REGULAR COMPRESSION 2
1650
 *************************************************************************/
1651
// Timestamp =====================================================
1652
int32_t tsCompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
260,136✔
1653
                             void *pBuf, int32_t nBuf) {
1654
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TIMESTAMP, 1);
260,136!
1655
}
1656

1657
int32_t tsDecompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
2,146,934✔
1658
                               void *pBuf, int32_t nBuf) {
1659
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TIMESTAMP, 0);
2,146,934!
1660
}
1661

1662
// Float =====================================================
1663
int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
181,217✔
1664
                         int32_t nBuf) {
1665
  DEFINE_VAR(cmprAlg)
181,217✔
1666
  if (l2 == L2_TSZ && lvl != 0 && lossyFloat) {
181,217!
1667
    return tsCompressFloatLossyImp(pIn, nEle, pOut);
×
1668
  }
1669
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1);
181,217!
1670
}
1671

1672
int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
438,603✔
1673
                           int32_t nBuf) {
1674
  DEFINE_VAR(cmprAlg)
438,603✔
1675
  if (lvl != 0 && HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
438,603!
1676
    return tsDecompressFloatLossyImp(pIn, nIn, nEle, pOut);
×
1677
  }
1678
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 0);
438,603!
1679
}
1680

1681
// Double =====================================================
1682
int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
115,181✔
1683
                          int32_t nBuf) {
1684
  DEFINE_VAR(cmprAlg)
115,181✔
1685
  if (l2 == L2_TSZ && lvl != 0 && lossyDouble) {
115,181!
1686
    // lossy mode
1687
    return tsCompressDoubleLossyImp(pIn, nEle, pOut);
×
1688
  }
1689
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DOUBLE, 1);
115,181!
1690
}
1691

1692
int32_t tsDecompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
370,223✔
1693
                            void *pBuf, int32_t nBuf) {
1694
  DEFINE_VAR(cmprAlg)
370,223✔
1695
  if (lvl != 0 && HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
370,223!
1696
    // decompress lossy
1697
    return tsDecompressDoubleLossyImp(pIn, nIn, nEle, pOut);
×
1698
  }
1699
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_DOUBLE, 0);
370,223!
1700
}
1701

1702
// Binary =====================================================
1703
int32_t tsCompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
361,717✔
1704
                          int32_t nBuf) {
1705
  DEFINE_VAR(cmprAlg)
361,717✔
1706
  if (l2 == L2_DISABLED) {
361,717!
1707
    l2 = 0;
×
1708
  }
1709
  return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, TSDB_DATA_TYPE_BINARY, lvl);
361,717✔
1710
}
1711

1712
int32_t tsDecompressString2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
1,073,128✔
1713
                            void *pBuf, int32_t nBuf) {
1714
  // return 0;
1715
  DEFINE_VAR(cmprAlg)
1,073,128✔
1716
  if (l2 == L2_DISABLED) {
1,073,128!
1717
    l2 = 0;
×
1718
  }
1719
  return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, TSDB_DATA_TYPE_BINARY);
1,073,128✔
1720
}
1721

1722
// Bool =====================================================
1723
int32_t tsCompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
165,152✔
1724
                        int32_t nBuf) {
1725
  uint32_t tCmprAlg = 0;
165,152✔
1726
  DEFINE_VAR(cmprAlg)
165,152✔
1727
  if (l1 != L1_RLE) {
165,152!
1728
    SET_COMPRESS(L1_RLE, l2, lvl, tCmprAlg);
×
1729
  } else {
1730
    tCmprAlg = cmprAlg;
165,152✔
1731
  }
1732
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BOOL, 1);
165,152!
1733
}
1734

1735
int32_t tsDecompressBool2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
170,215✔
1736
                          int32_t nBuf) {
1737
  uint32_t tCmprAlg = 0;
170,215✔
1738
  DEFINE_VAR(cmprAlg)
170,215✔
1739
  if (l1 != L1_RLE) {
170,215!
1740
    SET_COMPRESS(L1_RLE, l2, lvl, tCmprAlg);
×
1741
  } else {
1742
    tCmprAlg = cmprAlg;
170,215✔
1743
  }
1744
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BOOL, 0);
170,215!
1745
}
1746

1747
// Tinyint =====================================================
1748
int32_t tsCompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
567,995✔
1749
                           int32_t nBuf) {
1750
  uint32_t tCmprAlg = 0;
567,995✔
1751
  DEFINE_VAR(cmprAlg)
567,995✔
1752
  if (l1 != L1_SIMPLE_8B) {
567,995✔
1753
    SET_COMPRESS(L1_SIMPLE_8B, l2, lvl, tCmprAlg);
175,437✔
1754
  } else {
1755
    tCmprAlg = cmprAlg;
392,558✔
1756
  }
1757
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TINYINT, 1);
567,995!
1758
}
1759

1760
int32_t tsDecompressTinyint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
1,575,496✔
1761
                             void *pBuf, int32_t nBuf) {
1762
  uint32_t tCmprAlg = 0;
1,575,496✔
1763
  DEFINE_VAR(cmprAlg)
1,575,496✔
1764
  if (l1 != L1_SIMPLE_8B) {
1,575,496✔
1765
    SET_COMPRESS(L1_SIMPLE_8B, l2, lvl, tCmprAlg);
516,636✔
1766
  } else {
1767
    tCmprAlg = cmprAlg;
1,058,860✔
1768
  }
1769
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_TINYINT, 0);
1,575,496!
1770
}
1771

1772
// Smallint =====================================================
1773
int32_t tsCompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
184,580✔
1774
                            void *pBuf, int32_t nBuf) {
1775
  uint32_t tCmprAlg = 0;
184,580✔
1776
  DEFINE_VAR(cmprAlg)
184,580✔
1777
  if (l1 != L1_SIMPLE_8B) {
184,580!
1778
    SET_COMPRESS(L1_SIMPLE_8B, l2, lvl, tCmprAlg);
×
1779
  } else {
1780
    tCmprAlg = cmprAlg;
184,580✔
1781
  }
1782
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_SMALLINT, 1);
184,580!
1783
}
1784

1785
int32_t tsDecompressSmallint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
455,974✔
1786
                              void *pBuf, int32_t nBuf) {
1787
  uint32_t tCmprAlg = 0;
455,974✔
1788
  DEFINE_VAR(cmprAlg)
455,974✔
1789
  if (l1 != L1_SIMPLE_8B) {
455,974!
1790
    SET_COMPRESS(L1_SIMPLE_8B, l2, lvl, tCmprAlg);
×
1791
  } else {
1792
    tCmprAlg = cmprAlg;
455,974✔
1793
  }
1794
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_SMALLINT, 0);
455,974!
1795
}
1796

1797
// Int =====================================================
1798
int32_t tsCompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
667,247✔
1799
                       int32_t nBuf) {
1800
  uint32_t tCmprAlg = 0;
667,247✔
1801
  DEFINE_VAR(cmprAlg)
667,247✔
1802
  if (l1 != L1_SIMPLE_8B) {
667,247✔
1803
    SET_COMPRESS(L1_SIMPLE_8B, l2, lvl, tCmprAlg);
361,746✔
1804
  } else {
1805
    tCmprAlg = cmprAlg;
305,501✔
1806
  }
1807

1808
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_INT, 1);
667,247!
1809
}
1810

1811
int32_t tsDecompressInt2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
1,734,336✔
1812
                         int32_t nBuf) {
1813
  uint32_t tCmprAlg = 0;
1,734,336✔
1814
  DEFINE_VAR(cmprAlg)
1,734,336✔
1815
  if (l1 != L1_SIMPLE_8B) {
1,734,336✔
1816
    SET_COMPRESS(L1_SIMPLE_8B, l2, lvl, tCmprAlg);
1,073,266✔
1817
  } else {
1818
    tCmprAlg = cmprAlg;
661,070✔
1819
  }
1820
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, tCmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_INT, 0);
1,734,336!
1821
}
1822

1823
// Bigint =====================================================
1824
int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
505,163✔
1825
                          int32_t nBuf) {
1826
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 1);
505,163!
1827
}
1828

1829
int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
3,982,145✔
1830
                            void *pBuf, int32_t nBuf) {
1831
  FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 0);
3,982,145!
1832
}
1833

1834
void tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level) {
2,296,666✔
1835
  DEFINE_VAR(cmprAlg)
2,296,666✔
1836
  *l1Alg = l1;
2,296,666✔
1837
  *l2Alg = l2;
2,296,666✔
1838
  *level = lvl;
2,296,666✔
1839
  return;
2,296,666✔
1840
}
1841

1842
int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint8_t l2Disabled, uint8_t lvlDiabled, uint8_t lvlDefault,
29✔
1843

1844
                       uint32_t *dst) {
1845
  int8_t  update = 0;
29✔
1846
  uint8_t ol1 = COMPRESS_L1_TYPE_U32(oldCmpr);
29✔
1847
  uint8_t ol2 = COMPRESS_L2_TYPE_U32(oldCmpr);
29✔
1848
  uint8_t olvl = COMPRESS_L2_TYPE_LEVEL_U32(oldCmpr);
29✔
1849

1850
  uint8_t nl1 = COMPRESS_L1_TYPE_U32(newCmpr);
29✔
1851
  uint8_t nl2 = COMPRESS_L2_TYPE_U32(newCmpr);
29✔
1852
  uint8_t nlvl = COMPRESS_L2_TYPE_LEVEL_U32(newCmpr);
29✔
1853

1854
  // nl1 == 0, not update encode
1855
  // nl2 == 0, not update compress
1856
  // nl3 == 0, not update level
1857
  if (nl1 != 0 && ol1 != nl1) {
29!
1858
    SET_COMPRESS(nl1, ol2, olvl, *dst);
×
1859
    update = 1;
×
1860
    ol1 = nl1;
×
1861
  }
1862

1863
  if (nl2 != 0 && ol2 != nl2) {
29✔
1864
    if (nl2 == l2Disabled) {
16✔
1865
      SET_COMPRESS(ol1, nl2, lvlDiabled, *dst);
2✔
1866
    } else {
1867
      if (ol2 == l2Disabled) {
14✔
1868
        SET_COMPRESS(ol1, nl2, lvlDefault, *dst);
2✔
1869
      } else {
1870
        SET_COMPRESS(ol1, nl2, olvl, *dst);
12✔
1871
      }
1872
    }
1873
    update = 1;
16✔
1874
    ol2 = nl2;
16✔
1875
  }
1876

1877
  if (nlvl != 0 && olvl != nlvl) {
29✔
1878
    if (update == 0) {
9✔
1879
      if (ol2 == L2_DISABLED) {
6!
1880
        update = -1;
×
1881
        return update;
×
1882
      }
1883
    }
1884
    SET_COMPRESS(ol1, ol2, nlvl, *dst);
9✔
1885
    update = 1;
9✔
1886
  }
1887

1888
  return update;
29✔
1889
}
1890

1891
int32_t getWordLength(char type) {
19,290,144✔
1892
  int32_t wordLength = 0;
19,290,144✔
1893
  switch (type) {
19,290,144!
1894
    case TSDB_DATA_TYPE_BIGINT:
12,879,833✔
1895
      wordLength = LONG_BYTES;
12,879,833✔
1896
      break;
12,879,833✔
1897
    case TSDB_DATA_TYPE_INT:
3,636,604✔
1898
      wordLength = INT_BYTES;
3,636,604✔
1899
      break;
3,636,604✔
1900
    case TSDB_DATA_TYPE_SMALLINT:
640,675✔
1901
      wordLength = SHORT_BYTES;
640,675✔
1902
      break;
640,675✔
1903
    case TSDB_DATA_TYPE_TINYINT:
2,143,856✔
1904
      wordLength = CHAR_BYTES;
2,143,856✔
1905
      break;
2,143,856✔
1906
    default:
×
1907
      uError("Invalid decompress integer type:%d", type);
×
1908
      return TSDB_CODE_INVALID_PARA;
×
1909
  }
1910

1911
  return wordLength;
19,300,968✔
1912
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc