• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9662

pending completion
#9662

push

travis_ci

web-flow
Fix error update alias in PB_Tree WrappedSegment

4 of 4 new or added lines in 1 file covered. (100.0%)

79101 of 165738 relevant lines covered (47.73%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.37
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.schemaengine.schemaregion.tag;
21

22
import org.apache.iotdb.commons.conf.CommonConfig;
23
import org.apache.iotdb.commons.conf.CommonDescriptor;
24
import org.apache.iotdb.commons.exception.MetadataException;
25
import org.apache.iotdb.commons.file.SystemFileFactory;
26
import org.apache.iotdb.commons.path.PartialPath;
27
import org.apache.iotdb.commons.schema.filter.SchemaFilter;
28
import org.apache.iotdb.commons.schema.filter.SchemaFilterType;
29
import org.apache.iotdb.commons.schema.filter.impl.TagFilter;
30
import org.apache.iotdb.commons.schema.node.IMNode;
31
import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode;
32
import org.apache.iotdb.commons.schema.tree.SchemaIterator;
33
import org.apache.iotdb.commons.utils.FileUtils;
34
import org.apache.iotdb.db.schemaengine.SchemaConstant;
35
import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowTimeSeriesPlan;
36
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.ITimeSeriesSchemaInfo;
37
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.impl.ShowTimeSeriesResult;
38
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.reader.ISchemaReader;
39
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.reader.impl.SchemaReaderLimitOffsetWrapper;
40
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.reader.impl.TimeseriesReaderWithViewFetch;
41
import org.apache.iotdb.tsfile.utils.Pair;
42

43
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
45

46
import java.io.File;
47
import java.io.IOException;
48
import java.util.ArrayList;
49
import java.util.Collections;
50
import java.util.Comparator;
51
import java.util.HashMap;
52
import java.util.HashSet;
53
import java.util.Iterator;
54
import java.util.List;
55
import java.util.Map;
56
import java.util.NoSuchElementException;
57
import java.util.Set;
58
import java.util.concurrent.ConcurrentHashMap;
59

60
import static java.util.stream.Collectors.toList;
61

62
public class TagManager {
63

64
  private static final String TAG_FORMAT = "tag key is %s, tag value is %s, tlog offset is %d";
65
  private static final String DEBUG_MSG = "%s : TimeSeries %s is removed from tag inverted index, ";
66
  private static final String DEBUG_MSG_1 =
67
      "%s: TimeSeries %s's tag info has been removed from tag inverted index ";
68
  private static final String PREVIOUS_CONDITION =
69
      "before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b";
70

71
  private static final Logger logger = LoggerFactory.getLogger(TagManager.class);
1✔
72
  private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
1✔
73

74
  private TagLogFile tagLogFile;
75
  // tag key -> tag value -> LeafMNode
76
  private final Map<String, Map<String, Set<IMeasurementMNode<?>>>> tagIndex =
1✔
77
      new ConcurrentHashMap<>();
78

79
  public TagManager(String sgSchemaDirPath) throws IOException {
1✔
80
    tagLogFile = new TagLogFile(sgSchemaDirPath, SchemaConstant.TAG_LOG);
1✔
81
  }
1✔
82

83
  public synchronized boolean createSnapshot(File targetDir) {
84
    File tagLogSnapshot =
1✔
85
        SystemFileFactory.INSTANCE.getFile(targetDir, SchemaConstant.TAG_LOG_SNAPSHOT);
1✔
86
    File tagLogSnapshotTmp =
1✔
87
        SystemFileFactory.INSTANCE.getFile(targetDir, SchemaConstant.TAG_LOG_SNAPSHOT_TMP);
1✔
88
    try {
89
      tagLogFile.copyTo(tagLogSnapshotTmp);
1✔
90
      if (tagLogSnapshot.exists() && !FileUtils.deleteFileIfExist(tagLogSnapshot)) {
1✔
91
        logger.warn(
×
92
            "Failed to delete old snapshot {} while creating tagManager snapshot.",
93
            tagLogSnapshot.getName());
×
94
        return false;
×
95
      }
96
      if (!tagLogSnapshotTmp.renameTo(tagLogSnapshot)) {
1✔
97
        logger.warn(
×
98
            "Failed to rename {} to {} while creating tagManager snapshot.",
99
            tagLogSnapshotTmp.getName(),
×
100
            tagLogSnapshot.getName());
×
101
        if (!FileUtils.deleteFileIfExist(tagLogSnapshot)) {
×
102
          logger.warn("Failed to delete {} after renaming failure.", tagLogSnapshot.getName());
×
103
        }
104
        return false;
×
105
      }
106

107
      return true;
1✔
108
    } catch (IOException e) {
×
109
      logger.error("Failed to create tagManager snapshot due to {}", e.getMessage(), e);
×
110
      if (!FileUtils.deleteFileIfExist(tagLogSnapshot)) {
×
111
        logger.warn(
×
112
            "Failed to delete {} after creating tagManager snapshot failure.",
113
            tagLogSnapshot.getName());
×
114
      }
115
      return false;
×
116
    } finally {
117
      if (!FileUtils.deleteFileIfExist(tagLogSnapshotTmp)) {
1✔
118
        logger.warn("Failed to delete {}.", tagLogSnapshotTmp.getName());
×
119
      }
120
    }
121
  }
122

123
  public static TagManager loadFromSnapshot(File snapshotDir, String sgSchemaDirPath)
124
      throws IOException {
125
    File tagSnapshot =
1✔
126
        SystemFileFactory.INSTANCE.getFile(snapshotDir, SchemaConstant.TAG_LOG_SNAPSHOT);
1✔
127
    File tagFile = SystemFileFactory.INSTANCE.getFile(sgSchemaDirPath, SchemaConstant.TAG_LOG);
1✔
128
    if (tagFile.exists() && !tagFile.delete()) {
1✔
129
      logger.warn("Failed to delete existing {} when loading snapshot.", tagFile.getName());
×
130
    }
131

132
    try {
133
      org.apache.commons.io.FileUtils.copyFile(tagSnapshot, tagFile);
1✔
134
      return new TagManager(sgSchemaDirPath);
1✔
135
    } catch (IOException e) {
×
136
      if (!tagFile.delete()) {
×
137
        logger.warn(
×
138
            "Failed to delete existing {} when copying snapshot failure.", tagFile.getName());
×
139
      }
140
      throw e;
×
141
    }
142
  }
143

144
  public boolean recoverIndex(long offset, IMeasurementMNode<?> measurementMNode)
145
      throws IOException {
146
    Map<String, String> tags = tagLogFile.readTag(COMMON_CONFIG.getTagAttributeTotalSize(), offset);
1✔
147
    if (tags == null || tags.isEmpty()) {
1✔
148
      return false;
×
149
    } else {
150
      addIndex(tags, measurementMNode);
1✔
151
      return true;
1✔
152
    }
153
  }
154

155
  public void addIndex(String tagKey, String tagValue, IMeasurementMNode<?> measurementMNode) {
156
    if (tagKey == null || tagValue == null || measurementMNode == null) {
1✔
157
      return;
×
158
    }
159
    tagIndex
1✔
160
        .computeIfAbsent(tagKey, k -> new ConcurrentHashMap<>())
1✔
161
        .computeIfAbsent(tagValue, v -> Collections.synchronizedSet(new HashSet<>()))
1✔
162
        .add(measurementMNode);
1✔
163
  }
1✔
164

165
  public void addIndex(Map<String, String> tagsMap, IMeasurementMNode<?> measurementMNode) {
166
    if (tagsMap != null && measurementMNode != null) {
1✔
167
      for (Map.Entry<String, String> entry : tagsMap.entrySet()) {
1✔
168
        addIndex(entry.getKey(), entry.getValue(), measurementMNode);
1✔
169
      }
1✔
170
    }
171
  }
1✔
172

173
  public void removeIndex(String tagKey, String tagValue, IMeasurementMNode<?> measurementMNode) {
174
    tagIndex.get(tagKey).get(tagValue).remove(measurementMNode);
1✔
175
    if (tagIndex.get(tagKey).get(tagValue).isEmpty()) {
1✔
176
      tagIndex.get(tagKey).remove(tagValue);
1✔
177
    }
178
  }
1✔
179

180
  private List<IMeasurementMNode<?>> getMatchedTimeseriesInIndex(TagFilter tagFilter) {
181
    if (!tagIndex.containsKey(tagFilter.getKey())) {
1✔
182
      return Collections.emptyList();
1✔
183
    }
184
    Map<String, Set<IMeasurementMNode<?>>> value2Node = tagIndex.get(tagFilter.getKey());
1✔
185
    if (value2Node.isEmpty()) {
1✔
186
      return Collections.emptyList();
×
187
    }
188

189
    List<IMeasurementMNode<?>> allMatchedNodes = new ArrayList<>();
1✔
190
    if (tagFilter.isContains()) {
1✔
191
      for (Map.Entry<String, Set<IMeasurementMNode<?>>> entry : value2Node.entrySet()) {
×
192
        if (entry.getKey() == null || entry.getValue() == null) {
×
193
          continue;
×
194
        }
195
        String tagValue = entry.getKey();
×
196
        if (tagValue.contains(tagFilter.getValue())) {
×
197
          allMatchedNodes.addAll(entry.getValue());
×
198
        }
199
      }
×
200
    } else {
201
      for (Map.Entry<String, Set<IMeasurementMNode<?>>> entry : value2Node.entrySet()) {
1✔
202
        if (entry.getKey() == null || entry.getValue() == null) {
1✔
203
          continue;
×
204
        }
205
        String tagValue = entry.getKey();
1✔
206
        if (tagFilter.getValue().equals(tagValue)) {
1✔
207
          allMatchedNodes.addAll(entry.getValue());
1✔
208
        }
209
      }
1✔
210
    }
211
    // we just sort them by the alphabetical order
212
    allMatchedNodes =
1✔
213
        allMatchedNodes.stream()
1✔
214
            .sorted(Comparator.comparing(IMNode::getFullPath))
1✔
215
            .collect(toList());
1✔
216

217
    return allMatchedNodes;
1✔
218
  }
219

220
  public ISchemaReader<ITimeSeriesSchemaInfo> getTimeSeriesReaderWithIndex(
221
      IShowTimeSeriesPlan plan) {
222
    // schemaFilter must not null
223
    SchemaFilter schemaFilter = plan.getSchemaFilter();
1✔
224
    // currently, only one TagFilter is supported
225
    // all IMeasurementMNode in allMatchedNodes satisfied TagFilter
226
    Iterator<IMeasurementMNode<?>> allMatchedNodes =
1✔
227
        getMatchedTimeseriesInIndex(
1✔
228
                (TagFilter) SchemaFilter.extract(schemaFilter, SchemaFilterType.TAGS_FILTER).get(0))
1✔
229
            .iterator();
1✔
230
    PartialPath pathPattern = plan.getPath();
1✔
231
    SchemaIterator<ITimeSeriesSchemaInfo> schemaIterator =
1✔
232
        new SchemaIterator<ITimeSeriesSchemaInfo>() {
1✔
233
          private ITimeSeriesSchemaInfo nextMatched;
234
          private Throwable throwable;
235

236
          @Override
237
          public boolean hasNext() {
238
            if (throwable == null && nextMatched == null) {
1✔
239
              try {
240
                getNext();
1✔
241
              } catch (Throwable e) {
×
242
                throwable = e;
×
243
              }
1✔
244
            }
245
            return throwable == null && nextMatched != null;
1✔
246
          }
247

248
          @Override
249
          public ITimeSeriesSchemaInfo next() {
250
            if (!hasNext()) {
1✔
251
              throw new NoSuchElementException();
×
252
            }
253
            ITimeSeriesSchemaInfo result = nextMatched;
1✔
254
            nextMatched = null;
1✔
255
            return result;
1✔
256
          }
257

258
          private void getNext() throws IOException {
259
            nextMatched = null;
1✔
260
            while (allMatchedNodes.hasNext()) {
1✔
261
              IMeasurementMNode<?> node = allMatchedNodes.next();
1✔
262
              if (plan.isPrefixMatch()
1✔
263
                  ? pathPattern.prefixMatchFullPath(node.getPartialPath())
1✔
264
                  : pathPattern.matchFullPath(node.getPartialPath())) {
1✔
265
                Pair<Map<String, String>, Map<String, String>> tagAndAttributePair =
1✔
266
                    readTagFile(node.getOffset());
1✔
267
                nextMatched =
1✔
268
                    new ShowTimeSeriesResult(
269
                        node.getFullPath(),
1✔
270
                        node.getAlias(),
1✔
271
                        node.getSchema(),
1✔
272
                        tagAndAttributePair.left,
273
                        tagAndAttributePair.right,
274
                        node.getParent().getAsDeviceMNode().isAligned());
1✔
275
                break;
1✔
276
              }
277
            }
×
278
          }
1✔
279

280
          @Override
281
          public Throwable getFailure() {
282
            return throwable;
×
283
          }
284

285
          @Override
286
          public boolean isSuccess() {
287
            return throwable == null;
×
288
          }
289

290
          @Override
291
          public void close() {
292
            // do nothing
293
          }
1✔
294
        };
295
    ISchemaReader<ITimeSeriesSchemaInfo> reader =
1✔
296
        new TimeseriesReaderWithViewFetch(schemaIterator, schemaFilter);
297
    if (plan.getLimit() > 0 || plan.getOffset() > 0) {
1✔
298
      return new SchemaReaderLimitOffsetWrapper<>(reader, plan.getLimit(), plan.getOffset());
×
299
    } else {
300
      return reader;
1✔
301
    }
302
  }
303

304
  /**
305
   * Remove the node from the tag inverted index.
306
   *
307
   * @throws IOException error occurred when reading disk
308
   */
309
  public void removeFromTagInvertedIndex(IMeasurementMNode<?> node) throws IOException {
310
    if (node.getOffset() < 0) {
1✔
311
      return;
1✔
312
    }
313
    Map<String, String> tagMap =
1✔
314
        tagLogFile.readTag(COMMON_CONFIG.getTagAttributeTotalSize(), node.getOffset());
1✔
315
    if (tagMap != null) {
1✔
316
      for (Map.Entry<String, String> entry : tagMap.entrySet()) {
1✔
317
        if (tagIndex.containsKey(entry.getKey())
1✔
318
            && tagIndex.get(entry.getKey()).containsKey(entry.getValue())) {
1✔
319
          if (logger.isDebugEnabled()) {
1✔
320
            logger.debug(
×
321
                String.format(
×
322
                    String.format(DEBUG_MSG, "Delete" + TAG_FORMAT, node.getFullPath()),
×
323
                    entry.getKey(),
×
324
                    entry.getValue(),
×
325
                    node.getOffset()));
×
326
          }
327
          tagIndex.get(entry.getKey()).get(entry.getValue()).remove(node);
1✔
328
          if (tagIndex.get(entry.getKey()).get(entry.getValue()).isEmpty()) {
1✔
329
            tagIndex.get(entry.getKey()).remove(entry.getValue());
×
330
            if (tagIndex.get(entry.getKey()).isEmpty()) {
×
331
              tagIndex.remove(entry.getKey());
×
332
            }
333
          }
334
        } else {
335
          if (logger.isDebugEnabled()) {
×
336
            logger.debug(
×
337
                String.format(
×
338
                    String.format(DEBUG_MSG_1, "Delete" + PREVIOUS_CONDITION, node.getFullPath()),
×
339
                    entry.getKey(),
×
340
                    entry.getValue(),
×
341
                    node.getOffset(),
×
342
                    tagIndex.containsKey(entry.getKey())));
×
343
          }
344
        }
345
      }
1✔
346
    }
347
  }
1✔
348

349
  /**
350
   * Upsert tags and attributes key-value for the timeseries if the key has existed, just use the
351
   * new value to update it.
352
   *
353
   * @throws MetadataException metadata exception
354
   * @throws IOException error occurred when reading disk
355
   */
356
  public void updateTagsAndAttributes(
357
      Map<String, String> tagsMap,
358
      Map<String, String> attributesMap,
359
      IMeasurementMNode<?> leafMNode)
360
      throws MetadataException, IOException {
361

362
    Pair<Map<String, String>, Map<String, String>> pair =
1✔
363
        tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
1✔
364

365
    if (tagsMap != null) {
1✔
366
      for (Map.Entry<String, String> entry : tagsMap.entrySet()) {
1✔
367
        String key = entry.getKey();
1✔
368
        String value = entry.getValue();
1✔
369
        String beforeValue = pair.left.get(key);
1✔
370
        pair.left.put(key, value);
1✔
371
        // if the key has existed and the value is not equal to the new one
372
        // we should remove before key-value from inverted index map
373
        if (beforeValue != null && !beforeValue.equals(value)) {
1✔
374

375
          if (tagIndex.containsKey(key) && tagIndex.get(key).containsKey(beforeValue)) {
1✔
376
            if (logger.isDebugEnabled()) {
1✔
377
              logger.debug(
×
378
                  String.format(
×
379
                      String.format(DEBUG_MSG, "Upsert" + TAG_FORMAT, leafMNode.getFullPath()),
×
380
                      key,
381
                      beforeValue,
382
                      leafMNode.getOffset()));
×
383
            }
384

385
            removeIndex(key, beforeValue, leafMNode);
1✔
386
          } else {
387
            if (logger.isDebugEnabled()) {
×
388
              logger.debug(
×
389
                  String.format(
×
390
                      String.format(
×
391
                          DEBUG_MSG_1, "Upsert" + PREVIOUS_CONDITION, leafMNode.getFullPath()),
×
392
                      key,
393
                      beforeValue,
394
                      leafMNode.getOffset(),
×
395
                      tagIndex.containsKey(key)));
×
396
            }
397
          }
398
        }
399

400
        // if the key doesn't exist or the value is not equal to the new one
401
        // we should add a new key-value to inverted index map
402
        if (beforeValue == null || !beforeValue.equals(value)) {
1✔
403
          addIndex(key, value, leafMNode);
1✔
404
        }
405
      }
1✔
406
    }
407

408
    if (attributesMap != null) {
1✔
409
      pair.right.putAll(attributesMap);
1✔
410
    }
411

412
    // persist the change to disk
413
    tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
1✔
414
  }
1✔
415

416
  /**
417
   * Add new attributes key-value for the timeseries.
418
   *
419
   * @param attributesMap newly added attributes map
420
   * @throws MetadataException tagLogFile write error or attributes already exist
421
   * @throws IOException error occurred when reading disk
422
   */
423
  public void addAttributes(
424
      Map<String, String> attributesMap, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
425
      throws MetadataException, IOException {
426

427
    Pair<Map<String, String>, Map<String, String>> pair =
1✔
428
        tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
1✔
429

430
    for (Map.Entry<String, String> entry : attributesMap.entrySet()) {
1✔
431
      String key = entry.getKey();
1✔
432
      String value = entry.getValue();
1✔
433
      if (pair.right.containsKey(key)) {
1✔
434
        throw new MetadataException(
1✔
435
            String.format("TimeSeries [%s] already has the attribute [%s].", fullPath, key));
1✔
436
      }
437
      pair.right.put(key, value);
1✔
438
    }
1✔
439

440
    // persist the change to disk
441
    tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
1✔
442
  }
1✔
443

444
  /**
445
   * Add new tags key-value for the timeseries.
446
   *
447
   * @param tagsMap newly added tags map
448
   * @param fullPath timeseries
449
   * @throws MetadataException tagLogFile write error or tag already exists
450
   * @throws IOException error occurred when reading disk
451
   */
452
  public void addTags(
453
      Map<String, String> tagsMap, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
454
      throws MetadataException, IOException {
455

456
    Pair<Map<String, String>, Map<String, String>> pair =
1✔
457
        tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
1✔
458

459
    for (Map.Entry<String, String> entry : tagsMap.entrySet()) {
1✔
460
      String key = entry.getKey();
1✔
461
      String value = entry.getValue();
1✔
462
      if (pair.left.containsKey(key)) {
1✔
463
        throw new MetadataException(
1✔
464
            String.format("TimeSeries [%s] already has the tag [%s].", fullPath, key));
1✔
465
      }
466
      pair.left.put(key, value);
×
467
    }
×
468

469
    // persist the change to disk
470
    tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
×
471

472
    // update tag inverted map
473
    addIndex(tagsMap, leafMNode);
×
474
  }
×
475

476
  /**
477
   * Drop tags or attributes of the timeseries. It will not throw exception even if the key does not
478
   * exist.
479
   *
480
   * @param keySet tags key or attributes key
481
   * @throws MetadataException metadata exception
482
   * @throws IOException error occurred when reading disk
483
   */
484
  public void dropTagsOrAttributes(
485
      Set<String> keySet, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
486
      throws MetadataException, IOException {
487
    Pair<Map<String, String>, Map<String, String>> pair =
1✔
488
        tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
1✔
489

490
    Map<String, String> deleteTag = new HashMap<>();
1✔
491
    for (String key : keySet) {
1✔
492
      // check tag map
493
      // check attribute map
494
      String removeVal = pair.left.remove(key);
1✔
495
      if (removeVal != null) {
1✔
496
        deleteTag.put(key, removeVal);
1✔
497
      } else {
498
        removeVal = pair.right.remove(key);
1✔
499
        if (removeVal == null) {
1✔
500
          logger.warn("TimeSeries [{}] does not have tag/attribute [{}]", fullPath, key);
1✔
501
        }
502
      }
503
    }
1✔
504

505
    // persist the change to disk
506
    tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
1✔
507

508
    Map<String, Set<IMeasurementMNode<?>>> tagVal2LeafMNodeSet;
509
    Set<IMeasurementMNode<?>> nodeSet;
510
    for (Map.Entry<String, String> entry : deleteTag.entrySet()) {
1✔
511
      String key = entry.getKey();
1✔
512
      String value = entry.getValue();
1✔
513
      // change the tag inverted index map
514
      tagVal2LeafMNodeSet = tagIndex.get(key);
1✔
515
      if (tagVal2LeafMNodeSet != null) {
1✔
516
        nodeSet = tagVal2LeafMNodeSet.get(value);
1✔
517
        if (nodeSet != null) {
1✔
518
          if (logger.isDebugEnabled()) {
1✔
519
            logger.debug(
×
520
                String.format(
×
521
                    String.format(DEBUG_MSG, "Drop" + TAG_FORMAT, leafMNode.getFullPath()),
×
522
                    entry.getKey(),
×
523
                    entry.getValue(),
×
524
                    leafMNode.getOffset()));
×
525
          }
526

527
          nodeSet.remove(leafMNode);
1✔
528
          if (nodeSet.isEmpty()) {
1✔
529
            tagVal2LeafMNodeSet.remove(value);
1✔
530
            if (tagVal2LeafMNodeSet.isEmpty()) {
1✔
531
              tagIndex.remove(key);
1✔
532
            }
533
          }
534
        }
535
      } else {
536
        if (logger.isDebugEnabled()) {
×
537
          logger.debug(
×
538
              String.format(
×
539
                  String.format(DEBUG_MSG_1, "Drop" + PREVIOUS_CONDITION, leafMNode.getFullPath()),
×
540
                  key,
541
                  value,
542
                  leafMNode.getOffset(),
×
543
                  tagIndex.containsKey(key)));
×
544
        }
545
      }
546
    }
1✔
547
  }
1✔
548

549
  /**
550
   * Set/change the values of tags or attributes.
551
   *
552
   * @param alterMap the new tags or attributes key-value
553
   * @throws MetadataException tagLogFile write error or tags/attributes do not exist
554
   * @throws IOException error occurred when reading disk
555
   */
556
  public void setTagsOrAttributesValue(
557
      Map<String, String> alterMap, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
558
      throws MetadataException, IOException {
559
    // tags, attributes
560
    Pair<Map<String, String>, Map<String, String>> pair =
1✔
561
        tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
1✔
562
    Map<String, String> oldTagValue = new HashMap<>();
1✔
563
    Map<String, String> newTagValue = new HashMap<>();
1✔
564

565
    for (Map.Entry<String, String> entry : alterMap.entrySet()) {
1✔
566
      String key = entry.getKey();
1✔
567
      String value = entry.getValue();
1✔
568
      // check tag map
569
      if (pair.left.containsKey(key)) {
1✔
570
        oldTagValue.put(key, pair.left.get(key));
1✔
571
        newTagValue.put(key, value);
1✔
572
        pair.left.put(key, value);
1✔
573
      } else if (pair.right.containsKey(key)) {
1✔
574
        // check attribute map
575
        pair.right.put(key, value);
1✔
576
      } else {
577
        throw new MetadataException(
1✔
578
            String.format("TimeSeries [%s] does not have tag/attribute [%s].", fullPath, key),
1✔
579
            true);
580
      }
581
    }
1✔
582

583
    // persist the change to disk
584
    tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
1✔
585

586
    for (Map.Entry<String, String> entry : oldTagValue.entrySet()) {
1✔
587
      String key = entry.getKey();
1✔
588
      String beforeValue = entry.getValue();
1✔
589
      String currentValue = newTagValue.get(key);
1✔
590
      // change the tag inverted index map
591
      if (tagIndex.containsKey(key) && tagIndex.get(key).containsKey(beforeValue)) {
1✔
592

593
        if (logger.isDebugEnabled()) {
1✔
594
          logger.debug(
×
595
              String.format(
×
596
                  String.format(DEBUG_MSG, "Set" + TAG_FORMAT, leafMNode.getFullPath()),
×
597
                  entry.getKey(),
×
598
                  beforeValue,
599
                  leafMNode.getOffset()));
×
600
        }
601

602
        tagIndex.get(key).get(beforeValue).remove(leafMNode);
1✔
603
      } else {
604
        if (logger.isDebugEnabled()) {
×
605
          logger.debug(
×
606
              String.format(
×
607
                  String.format(DEBUG_MSG_1, "Set" + PREVIOUS_CONDITION, leafMNode.getFullPath()),
×
608
                  key,
609
                  beforeValue,
610
                  leafMNode.getOffset(),
×
611
                  tagIndex.containsKey(key)));
×
612
        }
613
      }
614
      addIndex(key, currentValue, leafMNode);
1✔
615
    }
1✔
616
  }
1✔
617

618
  /**
619
   * Rename the tag or attribute's key of the timeseries.
620
   *
621
   * @param oldKey old key of tag or attribute
622
   * @param newKey new key of tag or attribute
623
   * @throws MetadataException tagLogFile write error or does not have tag/attribute or already has
624
   *     a tag/attribute named newKey
625
   * @throws IOException error occurred when reading disk
626
   */
627
  public void renameTagOrAttributeKey(
628
      String oldKey, String newKey, PartialPath fullPath, IMeasurementMNode<?> leafMNode)
629
      throws MetadataException, IOException {
630
    // tags, attributes
631
    Pair<Map<String, String>, Map<String, String>> pair =
1✔
632
        tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), leafMNode.getOffset());
1✔
633

634
    // current name has existed
635
    if (pair.left.containsKey(newKey) || pair.right.containsKey(newKey)) {
1✔
636
      throw new MetadataException(
1✔
637
          String.format(
1✔
638
              "TimeSeries [%s] already has a tag/attribute named [%s].", fullPath, newKey),
639
          true);
640
    }
641

642
    // check tag map
643
    if (pair.left.containsKey(oldKey)) {
1✔
644
      String value = pair.left.remove(oldKey);
1✔
645
      pair.left.put(newKey, value);
1✔
646
      // persist the change to disk
647
      tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
1✔
648
      // change the tag inverted index map
649
      if (tagIndex.containsKey(oldKey) && tagIndex.get(oldKey).containsKey(value)) {
1✔
650

651
        if (logger.isDebugEnabled()) {
1✔
652
          logger.debug(
×
653
              String.format(
×
654
                  String.format(DEBUG_MSG, "Rename" + TAG_FORMAT, leafMNode.getFullPath()),
×
655
                  oldKey,
656
                  value,
657
                  leafMNode.getOffset()));
×
658
        }
659

660
        tagIndex.get(oldKey).get(value).remove(leafMNode);
1✔
661

662
      } else {
663
        if (logger.isDebugEnabled()) {
×
664
          logger.debug(
×
665
              String.format(
×
666
                  String.format(
×
667
                      DEBUG_MSG_1, "Rename" + PREVIOUS_CONDITION, leafMNode.getFullPath()),
×
668
                  oldKey,
669
                  value,
670
                  leafMNode.getOffset(),
×
671
                  tagIndex.containsKey(oldKey)));
×
672
        }
673
      }
674
      addIndex(newKey, value, leafMNode);
1✔
675
    } else if (pair.right.containsKey(oldKey)) {
1✔
676
      // check attribute map
677
      pair.right.put(newKey, pair.right.remove(oldKey));
1✔
678
      // persist the change to disk
679
      tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
1✔
680
    } else {
681
      throw new MetadataException(
1✔
682
          String.format("TimeSeries [%s] does not have tag/attribute [%s].", fullPath, oldKey),
1✔
683
          true);
684
    }
685
  }
1✔
686

687
  public long writeTagFile(Map<String, String> tags, Map<String, String> attributes)
688
      throws MetadataException, IOException {
689
    return tagLogFile.write(tags, attributes);
1✔
690
  }
691

692
  public Pair<Map<String, String>, Map<String, String>> readTagFile(long tagFileOffset)
693
      throws IOException {
694
    return tagLogFile.read(COMMON_CONFIG.getTagAttributeTotalSize(), tagFileOffset);
1✔
695
  }
696

697
  /**
698
   * Read the tags of this node.
699
   *
700
   * @param node the node to query.
701
   * @return the tag key-value map.
702
   * @throws RuntimeException If any IOException happens.
703
   */
704
  public Map<String, String> readTags(IMeasurementMNode<?> node) {
705
    try {
706
      return readTagFile(node.getOffset()).getLeft();
1✔
707
    } catch (IOException e) {
×
708
      throw new RuntimeException(e);
×
709
    }
710
  }
711

712
  public void clear() throws IOException {
713
    this.tagIndex.clear();
1✔
714
    if (tagLogFile != null) {
1✔
715
      tagLogFile.close();
1✔
716
      tagLogFile = null;
1✔
717
    }
718
  }
1✔
719
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc