• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / carbondata / 2965

pending completion
2965

push

jenkins

GitHub
Merge 58e2cb11f into cdcea33bb

85 of 85 new or added lines in 9 files covered. (100.0%)

62868 of 79380 relevant lines covered (79.2%)

1.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.82
/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *    http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
package org.apache.carbondata.processing.merger;
18

19
import java.io.IOException;
20
import java.util.AbstractQueue;
21
import java.util.ArrayList;
22
import java.util.Comparator;
23
import java.util.List;
24
import java.util.PriorityQueue;
25

26
import org.apache.carbondata.common.logging.LogServiceFactory;
27
import org.apache.carbondata.core.constants.CarbonCommonConstants;
28
import org.apache.carbondata.core.datastore.block.SegmentProperties;
29
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
30
import org.apache.carbondata.core.datastore.row.CarbonRow;
31
import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
32
import org.apache.carbondata.core.indexstore.PartitionSpec;
33
import org.apache.carbondata.core.keygenerator.KeyGenException;
34
import org.apache.carbondata.core.metadata.SegmentFileStore;
35
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
36
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
37
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
38
import org.apache.carbondata.core.util.ByteUtil;
39
import org.apache.carbondata.processing.exception.SliceMergerException;
40
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
41
import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
42
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
43
import org.apache.carbondata.processing.store.CarbonFactHandler;
44
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
45

46
import org.apache.log4j.Logger;
47

48
/**
49
 * This is the Merger class responsible for the merging of the segments.
50
 */
51
public class RowResultMergerProcessor extends AbstractResultProcessor {
52

53
  private CarbonFactHandler dataHandler;
54
  private SegmentProperties segprop;
55
  private CarbonLoadModel loadModel;
56
  private PartitionSpec partitionSpec;
57

58
  CarbonColumn[] noDicAndComplexColumns;
59
  /**
60
   * record holder heap
61
   */
62
  private AbstractQueue<RawResultIterator> recordHolderHeap;
63

64
  private static final Logger LOGGER =
1✔
65
      LogServiceFactory.getLogService(RowResultMergerProcessor.class.getName());
1✔
66

67
  public RowResultMergerProcessor(String databaseName,
68
      String tableName, SegmentProperties segProp, String[] tempStoreLocation,
69
      CarbonLoadModel loadModel, CompactionType compactionType, PartitionSpec partitionSpec)
70
      throws IOException {
1✔
71
    this.segprop = segProp;
1✔
72
    this.partitionSpec = partitionSpec;
1✔
73
    this.loadModel = loadModel;
1✔
74
    CarbonDataProcessorUtil.createLocations(tempStoreLocation);
1✔
75

76
    String carbonStoreLocation;
77
    if (partitionSpec != null) {
1✔
78
      carbonStoreLocation =
1✔
79
          partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel
1✔
80
              .getFactTimeStamp() + ".tmp";
1✔
81
    } else {
82
      carbonStoreLocation = CarbonDataProcessorUtil
1✔
83
          .createCarbonStoreLocation(loadModel.getCarbonDataLoadSchema().getCarbonTable(),
1✔
84
              loadModel.getSegmentId());
1✔
85
    }
86
    CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
1✔
87
        .getCarbonFactDataHandlerModel(loadModel,
1✔
88
            loadModel.getCarbonDataLoadSchema().getCarbonTable(), segProp, tableName,
1✔
89
            tempStoreLocation, carbonStoreLocation);
90
    setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
1✔
91
    carbonFactDataHandlerModel.setCompactionFlow(true);
1✔
92
    carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
1✔
93
    this.noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns();
1✔
94
    dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
1✔
95
  }
1✔
96

97
  private void initRecordHolderHeap(List<RawResultIterator> rawResultIteratorList) {
98
    // create the List of RawResultIterator.
99
    recordHolderHeap = new PriorityQueue<RawResultIterator>(rawResultIteratorList.size(),
1✔
100
        new RowResultMergerProcessor.CarbonMdkeyComparator());
101
  }
1✔
102

103
  /**
104
   * Merge function
105
   *
106
   */
107
  public boolean execute(List<RawResultIterator> unsortedResultIteratorList,
108
      List<RawResultIterator> sortedResultIteratorList) throws Exception {
109
    List<RawResultIterator> finalIteratorList = new ArrayList<>(unsortedResultIteratorList);
1✔
110
    finalIteratorList.addAll(sortedResultIteratorList);
1✔
111

112
    initRecordHolderHeap(finalIteratorList);
1✔
113
    boolean mergeStatus = false;
1✔
114
    int index = 0;
1✔
115
    boolean isDataPresent = false;
1✔
116
    try {
117

118
      // add all iterators to the queue
119
      for (RawResultIterator leaftTupleIterator : finalIteratorList) {
1✔
120
        this.recordHolderHeap.add(leaftTupleIterator);
1✔
121
        index++;
1✔
122
      }
1✔
123
      RawResultIterator iterator = null;
1✔
124
      while (index > 1) {
1✔
125
        // iterator the top record
126
        iterator = this.recordHolderHeap.poll();
1✔
127
        Object[] convertedRow = iterator.next();
1✔
128
        if (null == convertedRow) {
1✔
129
          index--;
×
130
          iterator.close();
×
131
          continue;
×
132
        }
133
        if (!isDataPresent) {
1✔
134
          dataHandler.initialise();
1✔
135
          isDataPresent = true;
1✔
136
        }
137
        // get the mdkey
138
        addRow(convertedRow);
1✔
139
        // if there is no record in the leaf and all then decrement the
140
        // index
141
        if (!iterator.hasNext()) {
1✔
142
          index--;
1✔
143
          iterator.close();
1✔
144
          continue;
1✔
145
        }
146
        // add record to heap
147
        this.recordHolderHeap.add(iterator);
1✔
148
      }
1✔
149
      // if record holder is not empty then iterator the slice holder from
150
      // heap
151
      iterator = this.recordHolderHeap.poll();
1✔
152
      if (null != iterator) {
1✔
153
        while (true) {
154
          Object[] convertedRow = iterator.next();
1✔
155
          if (null == convertedRow) {
1✔
156
            iterator.close();
×
157
            break;
×
158
          }
159
          // do it only once
160
          if (!isDataPresent) {
1✔
161
            dataHandler.initialise();
1✔
162
            isDataPresent = true;
1✔
163
          }
164
          addRow(convertedRow);
1✔
165
          // check if leaf contains no record
166
          if (!iterator.hasNext()) {
1✔
167
            break;
1✔
168
          }
169
        }
1✔
170
      }
171
      if (isDataPresent)
1✔
172
      {
173
        this.dataHandler.finish();
1✔
174
      }
175
      mergeStatus = true;
1✔
176
    } catch (Exception e) {
×
177
      mergeStatus = false;
×
178
      LOGGER.error(e.getLocalizedMessage(), e);
×
179
      throw e;
×
180
    } finally {
181
      try {
×
182
        if (isDataPresent) {
1✔
183
          this.dataHandler.closeHandler();
1✔
184
        }
185
        if (partitionSpec != null) {
1✔
186
          SegmentFileStore.writeSegmentFile(loadModel.getTablePath(), loadModel.getTaskNo(),
1✔
187
              partitionSpec.getLocation().toString(), loadModel.getFactTimeStamp() + "",
1✔
188
              partitionSpec.getPartitions());
1✔
189
        }
190
      } catch (CarbonDataWriterException | IOException e) {
×
191
        mergeStatus = false;
×
192
        throw e;
×
193
      }
1✔
194
    }
×
195

196
    return mergeStatus;
1✔
197
  }
198

199
  @Override
200
  public void close() {
201
    // close data handler
202
    if (null != dataHandler) {
1✔
203
      dataHandler.closeHandler();
1✔
204
    }
205
  }
1✔
206

207
  /**
208
   * Below method will be used to add sorted row
209
   *
210
   * @throws SliceMergerException
211
   */
212
  private void addRow(Object[] carbonTuple) throws SliceMergerException {
213
    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segprop, noDicAndComplexColumns);
1✔
214
    try {
215
      this.dataHandler.addDataToStore(row);
1✔
216
    } catch (CarbonDataWriterException e) {
×
217
      throw new SliceMergerException("Problem in merging the slice", e);
×
218
    }
1✔
219
  }
1✔
220

221
  /**
222
   * Comparator class for comparing 2 raw row result.
223
   */
224
  private class CarbonMdkeyComparator implements Comparator<RawResultIterator> {
225
    int[] columnValueSizes = segprop.getEachDimColumnValueSize();
1✔
226
    public CarbonMdkeyComparator() {
1✔
227
      initSortColumns();
1✔
228
    }
1✔
229

230
    private void initSortColumns() {
231
      int numberOfSortColumns = segprop.getNumberOfSortColumns();
1✔
232
      if (numberOfSortColumns != columnValueSizes.length) {
1✔
233
        int[] sortColumnValueSizes = new int[numberOfSortColumns];
1✔
234
        System.arraycopy(columnValueSizes, 0, sortColumnValueSizes, 0, numberOfSortColumns);
1✔
235
        this.columnValueSizes = sortColumnValueSizes;
1✔
236
      }
237
    }
1✔
238

239
    @Override public int compare(RawResultIterator o1, RawResultIterator o2) {
240

241
      Object[] row1 = new Object[0];
1✔
242
      Object[] row2 = new Object[0];
1✔
243
      try {
244
        row1 = o1.fetchConverted();
1✔
245
        row2 = o2.fetchConverted();
1✔
246
      } catch (KeyGenException e) {
×
247
        LOGGER.error(e.getMessage(), e);
×
248
      }
1✔
249
      if (null == row1 || null == row2) {
1✔
250
        return 0;
×
251
      }
252
      ByteArrayWrapper key1 = (ByteArrayWrapper) row1[0];
1✔
253
      ByteArrayWrapper key2 = (ByteArrayWrapper) row2[0];
1✔
254
      int compareResult = 0;
1✔
255
      int dictionaryKeyOffset = 0;
1✔
256
      byte[] dimCols1 = key1.getDictionaryKey();
1✔
257
      byte[] dimCols2 = key2.getDictionaryKey();
1✔
258
      int noDicIndex = 0;
1✔
259
      for (int eachColumnValueSize : columnValueSizes) {
1✔
260
        // case of dictionary cols
261
        if (eachColumnValueSize > 0) {
1✔
262

263
          compareResult = ByteUtil.UnsafeComparer.INSTANCE
1✔
264
              .compareTo(dimCols1, dictionaryKeyOffset, eachColumnValueSize, dimCols2,
1✔
265
                  dictionaryKeyOffset, eachColumnValueSize);
266
          dictionaryKeyOffset += eachColumnValueSize;
1✔
267
        } else { // case of no dictionary
268

269
          byte[] noDictionaryDim1 = key1.getNoDictionaryKeyByIndex(noDicIndex);
1✔
270
          byte[] noDictionaryDim2 = key2.getNoDictionaryKeyByIndex(noDicIndex);
1✔
271
          compareResult =
1✔
272
              ByteUtil.UnsafeComparer.INSTANCE.compareTo(noDictionaryDim1, noDictionaryDim2);
1✔
273
          noDicIndex++;
1✔
274

275
        }
276
        if (0 != compareResult) {
1✔
277
          return compareResult;
1✔
278
        }
279
      }
280
      return 0;
1✔
281
    }
282
  }
283

284
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc