• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / carbondata / 2965

pending completion
2965

push

jenkins

GitHub
Merge 58e2cb11f into cdcea33bb

85 of 85 new or added lines in 9 files covered. (100.0%)

62868 of 79380 relevant lines covered (79.2%)

1.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.36
/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *    http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17

18
package org.apache.carbondata.hadoop.api;
19

20
import java.io.IOException;
21
import java.util.ArrayList;
22
import java.util.HashSet;
23
import java.util.List;
24
import java.util.Set;
25

26
import org.apache.carbondata.common.logging.LogServiceFactory;
27
import org.apache.carbondata.core.constants.CarbonCommonConstants;
28
import org.apache.carbondata.core.datamap.Segment;
29
import org.apache.carbondata.core.datamap.status.DataMapStatusManager;
30
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
31
import org.apache.carbondata.core.datastore.impl.FileFactory;
32
import org.apache.carbondata.core.indexstore.PartitionSpec;
33
import org.apache.carbondata.core.locks.CarbonLockFactory;
34
import org.apache.carbondata.core.locks.ICarbonLock;
35
import org.apache.carbondata.core.locks.LockUsage;
36
import org.apache.carbondata.core.metadata.SegmentFileStore;
37
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
38
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
39
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
40
import org.apache.carbondata.core.statusmanager.SegmentStatus;
41
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
42
import org.apache.carbondata.core.util.CarbonSessionInfo;
43
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
44
import org.apache.carbondata.core.util.path.CarbonTablePath;
45
import org.apache.carbondata.events.OperationContext;
46
import org.apache.carbondata.events.OperationListenerBus;
47
import org.apache.carbondata.processing.loading.events.LoadEvents;
48
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
49
import org.apache.carbondata.processing.util.CarbonLoaderUtil;
50

51
import org.apache.hadoop.fs.Path;
52
import org.apache.hadoop.mapreduce.JobContext;
53
import org.apache.hadoop.mapreduce.JobStatus;
54
import org.apache.hadoop.mapreduce.TaskAttemptContext;
55
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
56
import org.apache.log4j.Logger;
57

58
/**
59
 * Outputcommitter which manages the segments during loading.It commits segment information to the
60
 * tablestatus file upon success or fail.
61
 */
62
public class CarbonOutputCommitter extends FileOutputCommitter {
63

64
  private static final Logger LOGGER =
2✔
65
      LogServiceFactory.getLogService(CarbonOutputCommitter.class.getName());
2✔
66

67
  private ICarbonLock segmentLock;
68

69
  public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
70
    super(outputPath, context);
2✔
71
  }
2✔
72

73
  /**
74
   * Update the tablestatus with inprogress while setup the job.
75
   *
76
   * @param context
77
   * @throws IOException
78
   */
79
  @Override public void setupJob(JobContext context) throws IOException {
80
    super.setupJob(context);
2✔
81
    boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
2✔
82
    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
2✔
83
    if (loadModel.getSegmentId() == null) {
2✔
84
      CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet);
×
85
    }
86
    // Take segment lock
87
    segmentLock = CarbonLockFactory.getCarbonLockObj(
2✔
88
        loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(),
2✔
89
        CarbonTablePath.addSegmentPrefix(loadModel.getSegmentId()) + LockUsage.LOCK);
2✔
90
    if (!segmentLock.lockWithRetries()) {
2✔
91
      throw new RuntimeException("Already segment is locked for loading, not supposed happen");
×
92
    }
93
    CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel);
2✔
94
  }
2✔
95

96
  @Override public void setupTask(TaskAttemptContext context) throws IOException {
97
    super.setupTask(context);
2✔
98
  }
2✔
99

100
  /**
101
   * Update the tablestatus as success after job is success
102
   *
103
   * @param context
104
   * @throws IOException
105
   */
106
  @Override public void commitJob(JobContext context) throws IOException {
107
    try {
108
      super.commitJob(context);
2✔
109
    } catch (IOException e) {
1✔
110
      // ignore, in case of concurrent load it try to remove temporary folders by other load may
111
      // cause file not found exception. This will not impact carbon load,
112
      LOGGER.warn(e.getMessage());
1✔
113
    }
2✔
114
    boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
2✔
115
    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
2✔
116
    LoadMetadataDetails newMetaEntry = loadModel.getCurrentLoadMetadataDetail();
2✔
117
    String readPath = CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath())
2✔
118
        + CarbonCommonConstants.FILE_SEPARATOR
119
        + loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp() + ".tmp";
2✔
120
    // Merge all partition files into a single file.
121
    String segmentFileName = SegmentFileStore.genSegmentFileName(
2✔
122
        loadModel.getSegmentId(), String.valueOf(loadModel.getFactTimeStamp()));
2✔
123
    SegmentFileStore.SegmentFile segmentFile = SegmentFileStore
2✔
124
        .mergeSegmentFiles(readPath, segmentFileName,
2✔
125
            CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath()));
2✔
126
    if (segmentFile != null) {
2✔
127
      if (null == newMetaEntry) {
1✔
128
        throw new RuntimeException("Internal Error");
×
129
      }
130
      // Move all files from temp directory of each segment to partition directory
131
      SegmentFileStore.moveFromTempFolder(segmentFile,
1✔
132
          loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp() + ".tmp",
1✔
133
          loadModel.getTablePath());
1✔
134
      newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
1✔
135
    }
136
    OperationContext operationContext = (OperationContext) getOperationContext();
2✔
137
    String uuid = "";
2✔
138
    if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() &&
2✔
139
        operationContext != null) {
140
      uuid = operationContext.getProperty("uuid").toString();
1✔
141
    }
142
    CarbonLoaderUtil
2✔
143
        .populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, loadModel.getFactTimeStamp(),
1✔
144
            true);
145
    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
1✔
146
    long segmentSize = CarbonLoaderUtil
1✔
147
        .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable);
1✔
148
    if (segmentSize > 0 || overwriteSet) {
1✔
149
      if (operationContext != null && carbonTable.hasAggregationDataMap()) {
1✔
150
        operationContext
1✔
151
            .setProperty("current.segmentfile", newMetaEntry.getSegmentFile());
1✔
152
        LoadEvents.LoadTablePreStatusUpdateEvent event =
1✔
153
            new LoadEvents.LoadTablePreStatusUpdateEvent(carbonTable.getCarbonTableIdentifier(),
1✔
154
                loadModel);
155
        try {
156
          OperationListenerBus.getInstance().fireEvent(event, operationContext);
1✔
157
        } catch (Exception e) {
×
158
          throw new IOException(e);
×
159
        }
1✔
160
      }
161
      String uniqueId = null;
1✔
162
      if (overwriteSet) {
1✔
163
        if (!loadModel.isCarbonTransactionalTable()) {
1✔
164
          CarbonLoaderUtil.deleteNonTransactionalTableForInsertOverwrite(loadModel);
×
165
        } else {
166
          if (segmentSize == 0) {
1✔
167
            newMetaEntry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
1✔
168
          }
169
          uniqueId = overwritePartitions(loadModel, newMetaEntry, uuid);
1✔
170
        }
171
      } else {
172
        CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid);
1✔
173
      }
174
      DataMapStatusManager.disableAllLazyDataMaps(carbonTable);
1✔
175
      if (operationContext != null) {
1✔
176
        LoadEvents.LoadTablePostStatusUpdateEvent postStatusUpdateEvent =
1✔
177
            new LoadEvents.LoadTablePostStatusUpdateEvent(loadModel);
178
        try {
179
          OperationListenerBus.getInstance()
1✔
180
              .fireEvent(postStatusUpdateEvent, operationContext);
1✔
181
        } catch (Exception e) {
×
182
          throw new IOException(e);
×
183
        }
1✔
184
      }
185
      String updateTime =
1✔
186
          context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null);
1✔
187
      String segmentsToBeDeleted =
1✔
188
          context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, "");
1✔
189
      List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
1✔
190
      Set<Segment> segmentSet = new HashSet<>(
1✔
191
          new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
1✔
192
              context.getConfiguration()).getValidAndInvalidSegments().getValidSegments());
1✔
193
      if (updateTime != null) {
1✔
194
        CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true,
1✔
195
            segmentDeleteList);
196
      } else if (uniqueId != null) {
1✔
197
        CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true,
1✔
198
            segmentDeleteList);
199
      }
200
    } else {
1✔
201
      CarbonLoaderUtil.updateTableStatusForFailure(loadModel);
1✔
202
    }
203
    if (segmentLock != null) {
1✔
204
      segmentLock.unlock();
1✔
205
    }
206
  }
1✔
207

208
  /**
209
   * Overwrite the partitions in case of overwrite query. It just updates the partition map files
210
   * of all segment files.
211
   *
212
   * @param loadModel
213
   * @return
214
   * @throws IOException
215
   */
216
  private String overwritePartitions(CarbonLoadModel loadModel, LoadMetadataDetails newMetaEntry,
217
      String uuid) throws IOException {
218
    CarbonTable table = loadModel.getCarbonDataLoadSchema().getCarbonTable();
1✔
219
    SegmentFileStore fileStore = new SegmentFileStore(loadModel.getTablePath(),
1✔
220
        loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp()
1✔
221
            + CarbonTablePath.SEGMENT_EXT);
222
    List<PartitionSpec> partitionSpecs = fileStore.getPartitionSpecs();
1✔
223

224
    if (partitionSpecs != null && partitionSpecs.size() > 0) {
1✔
225
      List<Segment> validSegments =
1✔
226
          new SegmentStatusManager(table.getAbsoluteTableIdentifier())
1✔
227
              .getValidAndInvalidSegments().getValidSegments();
1✔
228
      String uniqueId = String.valueOf(System.currentTimeMillis());
1✔
229
      List<String> tobeUpdatedSegs = new ArrayList<>();
1✔
230
      List<String> tobeDeletedSegs = new ArrayList<>();
1✔
231
      // First drop the partitions from partition mapper files of each segment
232
      for (Segment segment : validSegments) {
1✔
233
        new SegmentFileStore(table.getTablePath(), segment.getSegmentFileName())
1✔
234
            .dropPartitions(segment, partitionSpecs, uniqueId, tobeDeletedSegs, tobeUpdatedSegs);
1✔
235

236
      }
1✔
237
      newMetaEntry.setUpdateStatusFileName(uniqueId);
1✔
238
      // Commit the removed partitions in carbon store.
239
      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid,
1✔
240
          Segment.toSegmentList(tobeDeletedSegs, null),
1✔
241
          Segment.toSegmentList(tobeUpdatedSegs, null));
1✔
242
      return uniqueId;
1✔
243
    }
244
    return null;
1✔
245
  }
246

247
  private Object getOperationContext() {
248
    // when validate segments is disabled in thread local update it to CarbonTableInputFormat
249
    CarbonSessionInfo carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo();
2✔
250
    if (carbonSessionInfo != null) {
2✔
251
      return carbonSessionInfo.getThreadParams().getExtraInfo("partition.operationcontext");
2✔
252
    }
253
    return null;
×
254
  }
255

256
  /**
257
   * Update the tablestatus as fail if any fail happens.And also clean up the temp folders if any
258
   * are existed.
259
   *
260
   * @param context
261
   * @param state
262
   * @throws IOException
263
   */
264
  @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException {
265
    try {
266
      super.abortJob(context, state);
2✔
267
      CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
2✔
268
      CarbonLoaderUtil.updateTableStatusForFailure(loadModel);
2✔
269
      String segmentFileName = loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp();
2✔
270
      LoadMetadataDetails metadataDetail = loadModel.getCurrentLoadMetadataDetail();
2✔
271
      if (metadataDetail != null) {
2✔
272
        // In case the segment file is already created for this job then just link it so that it
273
        // will be used while cleaning.
274
        if (!metadataDetail.getSegmentStatus().equals(SegmentStatus.SUCCESS)) {
1✔
275
          String readPath = CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath())
1✔
276
              + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
277
              + CarbonTablePath.SEGMENT_EXT;
278
          if (FileFactory.getCarbonFile(readPath).exists()) {
1✔
279
            metadataDetail.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
×
280
          }
281
        }
282
      }
283
      // Clean the temp files
284
      CarbonFile segTmpFolder = FileFactory.getCarbonFile(
2✔
285
          CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath())
2✔
286
              + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName + ".tmp");
287
      // delete temp segment folder
288
      if (segTmpFolder.exists()) {
2✔
289
        FileFactory.deleteAllCarbonFilesOfDir(segTmpFolder);
×
290
      }
291
      CarbonFile segmentFilePath = FileFactory.getCarbonFile(
2✔
292
          CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath())
2✔
293
              + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
294
              + CarbonTablePath.SEGMENT_EXT);
295
      // Delete the temp data folders of this job if exists
296
      if (segmentFilePath.exists()) {
2✔
297
        SegmentFileStore fileStore = new SegmentFileStore(loadModel.getTablePath(),
×
298
            segmentFileName + CarbonTablePath.SEGMENT_EXT);
299
        SegmentFileStore.removeTempFolder(fileStore.getLocationMap(), segmentFileName + ".tmp",
×
300
            loadModel.getTablePath());
×
301
      }
302
      LOGGER.error("Loading failed with job status : " + state);
2✔
303
    } finally {
304
      if (segmentLock != null) {
2✔
305
        segmentLock.unlock();
2✔
306
      }
307
    }
×
308
  }
2✔
309

310
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc