• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9756

pending completion
#9756

push

travis_ci

web-flow
[To rel/1.2] [IOTDB-6096] Make M4 Function do the nullable judgement

157 of 157 new or added lines in 3 files covered. (100.0%)

79370 of 165647 relevant lines covered (47.92%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.63
/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFM4.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.commons.udf.builtin;
21

22
import org.apache.iotdb.commons.exception.MetadataException;
23
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
24
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
25
import org.apache.iotdb.udf.api.UDTF;
26
import org.apache.iotdb.udf.api.access.RowWindow;
27
import org.apache.iotdb.udf.api.collector.PointCollector;
28
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
29
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
30
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
31
import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
32
import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
33
import org.apache.iotdb.udf.api.exception.UDFException;
34
import org.apache.iotdb.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
35
import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException;
36
import org.apache.iotdb.udf.api.type.Type;
37

38
import java.io.IOException;
39

40
/**
41
 * For each sliding window, M4 returns the first, last, bottom, top points. The window can be
42
 * controlled by either point size or time interval length. The aggregated points in the output
43
 * series has been sorted and deduplicated.
44
 *
45
 * <p>SlidingSizeWindow usage Example: "select M4(s1,'windowSize'='10','slidingStep'='10') from
46
 * root.vehicle.d1" (windowSize is required, slidingStep is optional.)
47
 *
48
 * <p>SlidingTimeWindow usage Example: "select
49
 * M4(s1,'timeInterval'='25','slidingStep'='25','displayWindowBegin'='0','displayWindowEnd'='100')
50
 * from root.vehicle.d1" (timeInterval is required, slidingStep/displayWindowBegin/displayWindowEnd
51
 * are optional.)
52
 */
53
public class UDTFM4 implements UDTF {
1✔
54

55
  enum AccessStrategy {
1✔
56
    SIZE_WINDOW,
1✔
57
    TIME_WINDOW
1✔
58
  }
59

60
  protected AccessStrategy accessStrategy;
61
  protected TSDataType dataType;
62

63
  public static final String WINDOW_SIZE_KEY = "windowSize";
64
  public static final String TIME_INTERVAL_KEY = "timeInterval";
65
  public static final String SLIDING_STEP_KEY = "slidingStep";
66
  public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin";
67
  public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd";
68

69
  @Override
70
  public void validate(UDFParameterValidator validator) throws UDFException {
71
    validator
1✔
72
        .validateInputSeriesNumber(1)
1✔
73
        .validateInputSeriesDataType(0, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE);
1✔
74

75
    if (!validator.getParameters().hasAttribute(WINDOW_SIZE_KEY)
1✔
76
        && !validator.getParameters().hasAttribute(TIME_INTERVAL_KEY)) {
×
77
      throw new UDFParameterNotValidException(
×
78
          String.format(
×
79
              "attribute \"%s\"/\"%s\" is required but was not provided.",
80
              WINDOW_SIZE_KEY, TIME_INTERVAL_KEY));
81
    }
82
    if (validator.getParameters().hasAttribute(WINDOW_SIZE_KEY)
1✔
83
        && validator.getParameters().hasAttribute(TIME_INTERVAL_KEY)) {
1✔
84
      throw new UDFParameterNotValidException(
×
85
          String.format(
×
86
              "use attribute \"%s\" or \"%s\" only one at a time.",
87
              WINDOW_SIZE_KEY, TIME_INTERVAL_KEY));
88
    }
89
    if (validator.getParameters().hasAttribute(WINDOW_SIZE_KEY)) {
1✔
90
      accessStrategy = AccessStrategy.SIZE_WINDOW;
1✔
91
    } else {
92
      accessStrategy = AccessStrategy.TIME_WINDOW;
×
93
    }
94

95
    dataType =
1✔
96
        UDFDataTypeTransformer.transformToTsDataType(validator.getParameters().getDataType(0));
1✔
97
  }
1✔
98

99
  @Override
100
  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
101
      throws MetadataException {
102
    // set data type
103
    configurations.setOutputDataType(UDFDataTypeTransformer.transformToUDFDataType(dataType));
1✔
104

105
    // set access strategy
106
    if (accessStrategy == AccessStrategy.SIZE_WINDOW) {
1✔
107
      int windowSize = parameters.getInt(WINDOW_SIZE_KEY);
1✔
108
      int slidingStep = parameters.getIntOrDefault(SLIDING_STEP_KEY, windowSize);
1✔
109
      configurations.setAccessStrategy(
1✔
110
          new SlidingSizeWindowAccessStrategy(windowSize, slidingStep));
111
    } else {
1✔
112
      long timeInterval = parameters.getLong(TIME_INTERVAL_KEY);
×
113
      long displayWindowBegin =
×
114
          parameters.getLongOrDefault(DISPLAY_WINDOW_BEGIN_KEY, Long.MIN_VALUE);
×
115
      long displayWindowEnd = parameters.getLongOrDefault(DISPLAY_WINDOW_END_KEY, Long.MAX_VALUE);
×
116
      long slidingStep = parameters.getLongOrDefault(SLIDING_STEP_KEY, timeInterval);
×
117
      configurations.setAccessStrategy(
×
118
          new SlidingTimeWindowAccessStrategy(
119
              timeInterval, slidingStep, displayWindowBegin, displayWindowEnd));
120
    }
121
  }
1✔
122

123
  @Override
124
  public void transform(RowWindow rowWindow, PointCollector collector)
125
      throws UDFException, IOException {
126
    switch (dataType) {
×
127
      case INT32:
128
        transformInt(rowWindow, collector);
×
129
        break;
×
130
      case INT64:
131
        transformLong(rowWindow, collector);
×
132
        break;
×
133
      case FLOAT:
134
        transformFloat(rowWindow, collector);
×
135
        break;
×
136
      case DOUBLE:
137
        transformDouble(rowWindow, collector);
×
138
        break;
×
139
      default:
140
        // This will not happen
141
        throw new UDFInputSeriesDataTypeNotValidException(
×
142
            0,
143
            UDFDataTypeTransformer.transformToUDFDataType(dataType),
×
144
            Type.INT32,
145
            Type.INT64,
146
            Type.FLOAT,
147
            Type.DOUBLE);
148
    }
149
  }
×
150

151
  public void transformInt(RowWindow rowWindow, PointCollector collector) throws IOException {
152
    if (rowWindow.windowSize() > 0) { // else empty window do nothing
×
153
      int index = 0;
×
154
      int size = rowWindow.windowSize();
×
155
      int firstValueIndex = -1;
×
156
      int firstValue = 0;
×
157

158
      for (; index < size; index++) {
×
159
        if (!rowWindow.getRow(index).isNull(0)) {
×
160
          firstValueIndex = index;
×
161
          firstValue = rowWindow.getRow(index).getInt(0);
×
162
          break;
×
163
        }
164
      }
165

166
      if (firstValueIndex != -1) { // else empty window do nothing
×
167
        int lastValueIndex = firstValueIndex;
×
168
        int lastValue = firstValue;
×
169
        int minValueIndex = firstValueIndex;
×
170
        int minValue = firstValue;
×
171
        int maxValueIndex = firstValueIndex;
×
172
        int maxValue = firstValue;
×
173

174
        for (; index < size; index++) {
×
175
          if (!rowWindow.getRow(index).isNull(0)) {
×
176
            lastValueIndex = index;
×
177
            lastValue = rowWindow.getRow(index).getInt(0);
×
178
            if (lastValue < minValue) {
×
179
              minValue = lastValue;
×
180
              minValueIndex = index;
×
181
            }
182
            if (lastValue > maxValue) {
×
183
              maxValue = lastValue;
×
184
              maxValueIndex = index;
×
185
            }
186
          }
187
        }
188
        // first value
189
        collector.putInt(rowWindow.getRow(firstValueIndex).getTime(), firstValue);
×
190
        // min and max value if not duplicate
191
        // if min/max value is equal to first/last value, we keep first/last value
192
        int smallerIndex = Math.min(minValueIndex, maxValueIndex);
×
193
        int largerIndex = Math.max(minValueIndex, maxValueIndex);
×
194
        if (smallerIndex > firstValueIndex
×
195
            && rowWindow.getRow(smallerIndex).getInt(0) != lastValue) {
×
196
          collector.putInt(
×
197
              rowWindow.getRow(smallerIndex).getTime(), rowWindow.getRow(smallerIndex).getInt(0));
×
198
        }
199
        if (largerIndex > smallerIndex && rowWindow.getRow(largerIndex).getInt(0) != lastValue) {
×
200
          collector.putInt(
×
201
              rowWindow.getRow(largerIndex).getTime(), rowWindow.getRow(largerIndex).getInt(0));
×
202
        }
203
        // last value
204
        if (lastValueIndex > firstValueIndex) {
×
205
          collector.putInt(rowWindow.getRow(lastValueIndex).getTime(), lastValue);
×
206
        }
207
      }
208
    }
209
  }
×
210

211
  public void transformLong(RowWindow rowWindow, PointCollector collector) throws IOException {
212
    if (rowWindow.windowSize() > 0) { // else empty window do nothing
×
213
      int index = 0;
×
214
      int size = rowWindow.windowSize();
×
215
      int firstValueIndex = -1;
×
216
      long firstValue = 0;
×
217

218
      for (; index < size; index++) {
×
219
        if (!rowWindow.getRow(index).isNull(0)) {
×
220
          firstValueIndex = index;
×
221
          firstValue = rowWindow.getRow(index).getLong(0);
×
222
          break;
×
223
        }
224
      }
225

226
      if (firstValueIndex != -1) { // else empty window do nothing
×
227
        int lastValueIndex = firstValueIndex;
×
228
        long lastValue = firstValue;
×
229
        int minValueIndex = firstValueIndex;
×
230
        long minValue = firstValue;
×
231
        int maxValueIndex = firstValueIndex;
×
232
        long maxValue = firstValue;
×
233

234
        for (; index < size; index++) {
×
235
          if (!rowWindow.getRow(index).isNull(0)) {
×
236
            lastValueIndex = index;
×
237
            lastValue = rowWindow.getRow(index).getLong(0);
×
238
            if (lastValue < minValue) {
×
239
              minValue = lastValue;
×
240
              minValueIndex = index;
×
241
            }
242
            if (lastValue > maxValue) {
×
243
              maxValue = lastValue;
×
244
              maxValueIndex = index;
×
245
            }
246
          }
247
        }
248
        // first value
249
        collector.putLong(rowWindow.getRow(firstValueIndex).getTime(), firstValue);
×
250
        // min and max value if not duplicate
251
        // if min/max value is equal to first/last value, we keep first/last value
252
        int smallerIndex = Math.min(minValueIndex, maxValueIndex);
×
253
        int largerIndex = Math.max(minValueIndex, maxValueIndex);
×
254
        if (smallerIndex > firstValueIndex
×
255
            && rowWindow.getRow(smallerIndex).getLong(0) != lastValue) {
×
256
          collector.putLong(
×
257
              rowWindow.getRow(smallerIndex).getTime(), rowWindow.getRow(smallerIndex).getLong(0));
×
258
        }
259
        if (largerIndex > smallerIndex && rowWindow.getRow(largerIndex).getLong(0) != lastValue) {
×
260
          collector.putLong(
×
261
              rowWindow.getRow(largerIndex).getTime(), rowWindow.getRow(largerIndex).getLong(0));
×
262
        }
263
        // last value
264
        if (lastValueIndex > firstValueIndex) {
×
265
          collector.putLong(rowWindow.getRow(lastValueIndex).getTime(), lastValue);
×
266
        }
267
      }
268
    }
269
  }
×
270

271
  public void transformFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
272
    if (rowWindow.windowSize() > 0) { // else empty window do nothing
×
273
      int index = 0;
×
274
      int size = rowWindow.windowSize();
×
275
      int firstValueIndex = -1;
×
276
      float firstValue = 0;
×
277

278
      for (; index < size; index++) {
×
279
        if (!rowWindow.getRow(index).isNull(0)) {
×
280
          firstValueIndex = index;
×
281
          firstValue = rowWindow.getRow(index).getFloat(0);
×
282
          break;
×
283
        }
284
      }
285

286
      if (firstValueIndex != -1) { // else empty window do nothing
×
287
        int lastValueIndex = firstValueIndex;
×
288
        float lastValue = firstValue;
×
289
        int minValueIndex = firstValueIndex;
×
290
        float minValue = firstValue;
×
291
        int maxValueIndex = firstValueIndex;
×
292
        float maxValue = firstValue;
×
293

294
        for (; index < size; index++) {
×
295
          if (!rowWindow.getRow(index).isNull(0)) {
×
296
            lastValueIndex = index;
×
297
            lastValue = rowWindow.getRow(index).getFloat(0);
×
298
            if (lastValue < minValue) {
×
299
              minValue = lastValue;
×
300
              minValueIndex = index;
×
301
            }
302
            if (lastValue > maxValue) {
×
303
              maxValue = lastValue;
×
304
              maxValueIndex = index;
×
305
            }
306
          }
307
        }
308
        // first value
309
        collector.putFloat(rowWindow.getRow(firstValueIndex).getTime(), firstValue);
×
310
        // min and max value if not duplicate
311
        // if min/max value is equal to first/last value, we keep first/last value
312
        int smallerIndex = Math.min(minValueIndex, maxValueIndex);
×
313
        int largerIndex = Math.max(minValueIndex, maxValueIndex);
×
314
        if (smallerIndex > firstValueIndex
×
315
            && rowWindow.getRow(smallerIndex).getFloat(0) != lastValue) {
×
316
          collector.putFloat(
×
317
              rowWindow.getRow(smallerIndex).getTime(), rowWindow.getRow(smallerIndex).getFloat(0));
×
318
        }
319
        if (largerIndex > smallerIndex && rowWindow.getRow(largerIndex).getFloat(0) != lastValue) {
×
320
          collector.putFloat(
×
321
              rowWindow.getRow(largerIndex).getTime(), rowWindow.getRow(largerIndex).getFloat(0));
×
322
        }
323
        // last value
324
        if (lastValueIndex > firstValueIndex) {
×
325
          collector.putFloat(rowWindow.getRow(lastValueIndex).getTime(), lastValue);
×
326
        }
327
      }
328
    }
329
  }
×
330

331
  public void transformDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
332
    if (rowWindow.windowSize() > 0) { // else empty window do nothing
×
333
      int index = 0;
×
334
      int size = rowWindow.windowSize();
×
335
      int firstValueIndex = -1;
×
336
      double firstValue = 0;
×
337

338
      for (; index < size; index++) {
×
339
        if (!rowWindow.getRow(index).isNull(0)) {
×
340
          firstValueIndex = index;
×
341
          firstValue = rowWindow.getRow(index).getDouble(0);
×
342
          break;
×
343
        }
344
      }
345

346
      if (firstValueIndex != -1) { // else empty window do nothing
×
347
        int lastValueIndex = firstValueIndex;
×
348
        double lastValue = firstValue;
×
349
        int minValueIndex = firstValueIndex;
×
350
        double minValue = firstValue;
×
351
        int maxValueIndex = firstValueIndex;
×
352
        double maxValue = firstValue;
×
353

354
        for (; index < size; index++) {
×
355
          if (!rowWindow.getRow(index).isNull(0)) {
×
356
            lastValueIndex = index;
×
357
            lastValue = rowWindow.getRow(index).getDouble(0);
×
358
            if (lastValue < minValue) {
×
359
              minValue = lastValue;
×
360
              minValueIndex = index;
×
361
            }
362
            if (lastValue > maxValue) {
×
363
              maxValue = lastValue;
×
364
              maxValueIndex = index;
×
365
            }
366
          }
367
        }
368
        // first value
369
        collector.putDouble(rowWindow.getRow(firstValueIndex).getTime(), firstValue);
×
370
        // min and max value if not duplicate
371
        // if min/max value is equal to first/last value, we keep first/last value
372
        int smallerIndex = Math.min(minValueIndex, maxValueIndex);
×
373
        int largerIndex = Math.max(minValueIndex, maxValueIndex);
×
374
        if (smallerIndex > firstValueIndex
×
375
            && rowWindow.getRow(smallerIndex).getDouble(0) != lastValue) {
×
376
          collector.putDouble(
×
377
              rowWindow.getRow(smallerIndex).getTime(),
×
378
              rowWindow.getRow(smallerIndex).getDouble(0));
×
379
        }
380
        if (largerIndex > smallerIndex && rowWindow.getRow(largerIndex).getDouble(0) != lastValue) {
×
381
          collector.putDouble(
×
382
              rowWindow.getRow(largerIndex).getTime(), rowWindow.getRow(largerIndex).getDouble(0));
×
383
        }
384
        // last value
385
        if (lastValueIndex > firstValueIndex) {
×
386
          collector.putDouble(rowWindow.getRow(lastValueIndex).getTime(), lastValue);
×
387
        }
388
      }
389
    }
390
  }
×
391
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc