• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.34
/source/util/src/tdigest.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
/*
17
 * src/tdigest.c
18
 *
19
 * Implementation of the t-digest data structure used to compute accurate percentiles.
20
 *
21
 * It is based on the MergingDigest implementation found at:
22
 *   https://github.com/tdunning/t-digest/blob/master/src/main/java/com/tdunning/math/stats/MergingDigest.java
23
 *
24
 * Copyright (c) 2016, Usman Masood <usmanm at fastmail dot fm>
25
 */
26

27
#include "tdigest.h"
28
#include "os.h"
29
#include "osMath.h"
30
#include "tlog.h"
31

32
#define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0)))
33
// #define INTEGRATED_LOCATION(compression, q)   ((compression) * (asin(2 * (q) - 1) + M_PI / 2) / M_PI)
34
#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (double)(q)-1) / M_PI + (double)1 / 2))
35
#define FLOAT_EQ(f1, f2)                    (fabs((f1) - (f2)) <= FLT_EPSILON)
36

37
typedef struct SMergeArgs {
38
  TDigest   *t;
39
  SCentroid *centroids;
40
  int32_t    idx;
41
  double     weight_so_far;
42
  double     k1;
43
  double     min;
44
  double     max;
45
} SMergeArgs;
46

47
void tdigestAutoFill(TDigest *t, int32_t compression) {
34,502✔
48
  t->centroids = (SCentroid *)((char *)t + sizeof(TDigest));
34,502✔
49
  t->buffered_pts = (SPt *)((char *)t + sizeof(TDigest) + sizeof(SCentroid) * (int32_t)GET_CENTROID(compression));
34,502✔
50
}
34,502✔
51

52
TDigest *tdigestNewFrom(void *pBuf, int32_t compression) {
11,431✔
53
  memset(pBuf, 0, (size_t)TDIGEST_SIZE(compression));
11,431✔
54
  TDigest *t = (TDigest *)pBuf;
11,431✔
55
  tdigestAutoFill(t, compression);
11,431✔
56

57
  t->compression = compression;
11,431✔
58
  t->size = (int64_t)GET_CENTROID(compression);
11,431✔
59
  t->threshold = (int32_t)GET_THRESHOLD(compression);
11,431✔
60
  t->min = DOUBLE_MAX;
11,431✔
61
  t->max = -DOUBLE_MAX;
11,431✔
62

63
  return t;
11,431✔
64
}
65

66
static int32_t cmpCentroid(const void *a, const void *b) {
12,630✔
67
  SCentroid *c1 = (SCentroid *)a;
12,630✔
68
  SCentroid *c2 = (SCentroid *)b;
12,630✔
69
  if (c1->mean < c2->mean) return -1;
12,630✔
70
  if (c1->mean > c2->mean) return 1;
2,280✔
71
  return 0;
1,200✔
72
}
73

74
static void mergeCentroid(SMergeArgs *args, SCentroid *merge) {
14,471✔
75
  double     k2;
76
  SCentroid *c = &args->centroids[args->idx];
14,471✔
77

78
  args->weight_so_far += merge->weight;
14,471✔
79
  k2 = INTEGRATED_LOCATION(args->t->size, args->weight_so_far / args->t->total_weight);
14,471✔
80
  // idx++
81
  if (k2 - args->k1 > 1 && c->weight > 0) {
14,471!
82
    if (args->idx + 1 < args->t->size && merge->mean != args->centroids[args->idx].mean) {
6,330!
83
      args->idx++;
5,130✔
84
    }
85
    args->k1 = k2;
6,330✔
86
  }
87

88
  c = &args->centroids[args->idx];
14,471✔
89
  if (c->mean == merge->mean) {
14,471✔
90
    c->weight += merge->weight;
1,656✔
91
  } else {
92
    c->weight += merge->weight;
12,815✔
93
    c->mean += (merge->mean - c->mean) * merge->weight / c->weight;
12,815✔
94

95
    if (merge->weight > 0) {
12,815!
96
      args->min = TMIN(merge->mean, args->min);
12,815✔
97
      args->max = TMAX(merge->mean, args->max);
12,815!
98
    }
99
  }
100
}
14,471✔
101

102
int32_t tdigestCompress(TDigest *t) {
11,272✔
103
  SCentroid *unmerged_centroids;
104
  int64_t    unmerged_weight = 0;
11,272✔
105
  int32_t    num_unmerged = t->num_buffered_pts;
11,272✔
106
  int32_t    i, j;
107
  SMergeArgs args;
108

109
  if (t->num_buffered_pts <= 0) return 0;
11,272✔
110

111
  unmerged_centroids = (SCentroid *)taosMemoryMalloc(sizeof(SCentroid) * t->num_buffered_pts);
8,137!
112
  if (unmerged_centroids == NULL) {
8,137!
113
    return terrno;
×
114
  }
115
  for (i = 0; i < num_unmerged; i++) {
22,605✔
116
    SPt       *p = t->buffered_pts + i;
14,468✔
117
    SCentroid *c = &unmerged_centroids[i];
14,468✔
118
    c->mean = p->value;
14,468✔
119
    c->weight = p->weight;
14,468✔
120
    unmerged_weight += c->weight;
14,468✔
121
  }
122
  t->num_buffered_pts = 0;
8,137✔
123
  t->total_weight += unmerged_weight;
8,137✔
124

125
  taosSort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid);
8,137✔
126
  memset(&args, 0, sizeof(SMergeArgs));
8,138✔
127
  args.centroids = (SCentroid *)taosMemoryMalloc((size_t)(sizeof(SCentroid) * t->size));
8,138!
128
  if (args.centroids == NULL) {
8,142!
129
    taosMemoryFree((void *)unmerged_centroids);
×
130
    return terrno;
×
131
  }
132
  memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size));
8,142✔
133

134
  args.t = t;
8,142✔
135
  args.min = DOUBLE_MAX;
8,142✔
136
  args.max = -DOUBLE_MAX;
8,142✔
137

138
  i = 0;
8,142✔
139
  j = 0;
8,142✔
140
  while (i < num_unmerged && j < t->num_centroids) {
8,142!
141
    SCentroid *a = &unmerged_centroids[i];
×
142
    SCentroid *b = &t->centroids[j];
×
143

144
    if (a->mean <= b->mean) {
×
145
      mergeCentroid(&args, a);
×
146
      i++;
×
147
    } else {
148
      mergeCentroid(&args, b);
×
149
      j++;
×
150
    }
151
  }
152

153
  while (i < num_unmerged) {
22,612✔
154
    mergeCentroid(&args, &unmerged_centroids[i++]);
14,470✔
155
  }
156
  taosMemoryFree((void *)unmerged_centroids);
8,142!
157

158
  while (j < t->num_centroids) {
8,141!
159
    mergeCentroid(&args, &t->centroids[j++]);
×
160
  }
161

162
  if (t->total_weight > 0) {
8,141!
163
    t->min = TMIN(t->min, args.min);
8,141!
164
    if (args.centroids[args.idx].weight <= 0) {
8,141!
165
      args.idx--;
×
166
    }
167
    t->num_centroids = args.idx + 1;
8,141✔
168
    t->max = TMAX(t->max, args.max);
8,141!
169
  }
170

171
  memcpy(t->centroids, args.centroids, sizeof(SCentroid) * t->num_centroids);
8,141✔
172
  taosMemoryFree((void *)args.centroids);
8,141!
173
  return 0;
8,143✔
174
}
175

176
int32_t tdigestAdd(TDigest *t, double x, int64_t w) {
364,761✔
177
  if (w == 0) return 0;
364,761!
178

179
  int32_t i = t->num_buffered_pts;
364,761✔
180
  if (i > 0 && t->buffered_pts[i - 1].value == x) {
364,761✔
181
    t->buffered_pts[i].weight = w;
349,760✔
182
  } else {
183
    t->buffered_pts[i].value = x;
15,001✔
184
    t->buffered_pts[i].weight = w;
15,001✔
185
    t->num_buffered_pts++;
15,001✔
186
  }
187

188
  if (t->num_buffered_pts >= t->threshold) {
364,761!
189
    return tdigestCompress(t);
×
190
  }
191
  return 0;
364,761✔
192
}
193

194
#if 0
195
double tdigestCDF(TDigest *t, double x) {
196
  if (t == NULL) return 0;
197

198
  int32_t    i;
199
  double     left, right;
200
  int64_t    weight_so_far;
201
  SCentroid *a, *b, tmp;
202

203
  tdigestCompress(t);
204
  if (t->num_centroids == 0) return NAN;
205
  if (x < t->min) return 0;
206
  if (x > t->max) return 1;
207
  if (t->num_centroids == 1) {
208
    if (FLOAT_EQ(t->max, t->min)) return 0.5;
209

210
    return INTERPOLATE(x, t->min, t->max);
211
  }
212

213
  weight_so_far = 0;
214
  a = b = &tmp;
215
  b->mean = t->min;
216
  b->weight = 0;
217
  right = 0;
218

219
  for (i = 0; i < t->num_centroids; i++) {
220
    SCentroid *c = &t->centroids[i];
221

222
    left = b->mean - (a->mean + right);
223
    a = b;
224
    b = c;
225
    right = (b->mean - a->mean) * a->weight / (a->weight + b->weight);
226

227
    if (x < a->mean + right) {
228
      double cdf = (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right)) / t->total_weight;
229
      return TMAX(cdf, 0.0);
230
    }
231

232
    weight_so_far += a->weight;
233
  }
234

235
  left = b->mean - (a->mean + right);
236
  a = b;
237
  right = t->max - a->mean;
238

239
  if (x < a->mean + right) {
240
    return (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right)) / t->total_weight;
241
  }
242

243
  return 1;
244
}
245
#endif
246

247
double tdigestQuantile(TDigest *t, double q) {
11,272✔
248
  if (t == NULL) return 0;
11,272!
249

250
  int32_t    i;
251
  double     left, right, idx;
252
  int64_t    weight_so_far;
253
  SCentroid *a, *b, tmp;
254

255
  if (tdigestCompress(t) != 0) {
11,272!
256
    uError("failed to compress t-digest");
×
257
  }
258
  if (t->num_centroids == 0) return NAN;
11,277✔
259
  if (t->num_centroids == 1) return t->centroids[0].mean;
8,142✔
260
  if (FLOAT_EQ(q, 0.0)) return t->min;
570!
261
  if (FLOAT_EQ(q, 1.0)) return t->max;
570✔
262

263
  idx = q * t->total_weight;
380✔
264
  weight_so_far = 0;
380✔
265
  b = &tmp;
380✔
266
  b->mean = t->min;
380✔
267
  b->weight = 0;
380✔
268
  right = t->min;
380✔
269

270
  for (i = 0; i < t->num_centroids; i++) {
1,710!
271
    SCentroid *c = &t->centroids[i];
1,710✔
272
    a = b;
1,710✔
273
    left = right;
1,710✔
274

275
    b = c;
1,710✔
276
    right = (b->weight * a->mean + a->weight * b->mean) / (a->weight + b->weight);
1,710✔
277
    if (idx < weight_so_far + a->weight) {
1,710✔
278
      double p = (idx - weight_so_far) / ((a->weight == 0) ? 1 : a->weight);
380!
279
      return left * (1 - p) + right * p;
380✔
280
    }
281
    weight_so_far += a->weight;
1,330✔
282
  }
283

284
  left = right;
×
285
  a = b;
×
286
  right = t->max;
×
287

288
  if (idx < weight_so_far + a->weight && a->weight != 0) {
×
289
    double p = (idx - weight_so_far) / a->weight;
×
290
    return left * (1 - p) + right * p;
×
291
  }
292

293
  return t->max;
×
294
}
295

296
int32_t tdigestMerge(TDigest *t1, TDigest *t2) {
×
297
  int32_t code = 0;
×
298
  // SPoints
299
  int32_t num_pts = t2->num_buffered_pts;
×
300
  for (int32_t i = num_pts - 1; i >= 0; i--) {
×
301
    SPt *p = t2->buffered_pts + i;
×
302
    code = tdigestAdd(t1, p->value, p->weight);
×
303
    if (code) return code;
×
304
    t2->num_buffered_pts--;
×
305
  }
306
  // centroids
307
  for (int32_t i = 0; i < t2->num_centroids; i++) {
×
308
    code = tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight);
×
309
    if (code) return code;
×
310
  }
311

312
  return 0;
×
313
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc