• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / incubator-omid / 277

pending completion
277

push

travis-ci-com

web-flow
Upgrade Netty to 4.1.86.Final (#125)

Co-authored-by: Rajeshbabu Chintaguntla <rajeshbabu@apache.org>

1723 of 4847 relevant lines covered (35.55%)

285709.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.22
/tso-server/src/main/java/org/apache/omid/tso/WorldClockOracleImpl.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
18
package org.apache.omid.tso;
19

20
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
21
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
22

23
import org.apache.omid.committable.CommitTable;
24
import org.apache.omid.metrics.Gauge;
25
import org.apache.omid.metrics.MetricsRegistry;
26
import org.apache.omid.timestamp.storage.TimestampStorage;
27
import org.slf4j.Logger;
28
import org.slf4j.LoggerFactory;
29

30
import javax.inject.Inject;
31
import javax.inject.Singleton;
32

33
import java.io.IOException;
34
import java.util.concurrent.Executors;
35
import java.util.concurrent.ScheduledExecutorService;
36
import java.util.concurrent.TimeUnit;
37

38
import static org.apache.omid.metrics.MetricsUtils.name;
39

40
/**
41
 * The Timestamp Oracle that gives monotonically increasing timestamps based on world time
42
 */
43
@Singleton
8✔
44
public class WorldClockOracleImpl implements TimestampOracle {
76✔
45

46
    private static final Logger LOG = LoggerFactory.getLogger(WorldClockOracleImpl.class);
8✔
47

48
    static final long MAX_TX_PER_MS = 1_000_000; // 1 million
49
    static final long TIMESTAMP_INTERVAL_MS = 10_000; // 10 seconds interval
50
    private static final long TIMESTAMP_ALLOCATION_INTERVAL_MS = 7_000; // 7 seconds
51

52
    private long lastTimestamp;
53
    private long maxTimestamp;
54

55
    private TimestampStorage storage;
56
    private Panicker panicker;
57

58
    private volatile long maxAllocatedTime;
59

60
    private final ScheduledExecutorService scheduler =
18✔
61
            Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ts-persist-%d").build());
18✔
62

63
    private Runnable allocateTimestampsBatchTask;
64

65
    private class AllocateTimestampBatchTask implements Runnable {
66
        long previousMaxTime;
67

68
        AllocateTimestampBatchTask(long previousMaxTime) {
20✔
69
            this.previousMaxTime = previousMaxTime;
20✔
70
        }
20✔
71

72
        @Override
73
        public void run() {
74
            long newMaxTime = (System.currentTimeMillis() + TIMESTAMP_INTERVAL_MS) * MAX_TX_PER_MS;
38✔
75
            try {
76
                storage.updateMaxTimestamp(previousMaxTime, newMaxTime);
38✔
77
                maxAllocatedTime = newMaxTime;
36✔
78
                previousMaxTime = newMaxTime;
36✔
79
            } catch (Throwable e) {
2✔
80
                panicker.panic("Can't store the new max timestamp", e);
2✔
81
            }
36✔
82
        }
38✔
83
    }
84

85
    @Inject
86
    public WorldClockOracleImpl(MetricsRegistry metrics,
87
                               TimestampStorage tsStorage,
88
                               Panicker panicker) throws IOException {
18✔
89

90
        this.storage = tsStorage;
18✔
91
        this.panicker = panicker;
18✔
92

93
        metrics.gauge(name("tso", "maxTimestamp"), new Gauge<Long>() {
36✔
94
            @Override
95
            public Long getValue() {
96
                return maxTimestamp;
×
97
            }
98
        });
99

100
    }
18✔
101

102
    @Override
103
    public void initialize() throws IOException {
104

105
        this.lastTimestamp = this.maxTimestamp = storage.getMaxTimestamp();
20✔
106

107
        this.allocateTimestampsBatchTask = new AllocateTimestampBatchTask(lastTimestamp);
20✔
108

109
        // Trigger first allocation of timestamps
110
        scheduler.schedule(allocateTimestampsBatchTask, 0, TimeUnit.MILLISECONDS);
20✔
111

112
        // Waiting for the current epoch to start. Occurs in case of failover when the previous TSO allocated the current time frame.
113
        while ((System.currentTimeMillis() * MAX_TX_PER_MS) < this.lastTimestamp) {
20✔
114
            try {
115
                Thread.sleep(1000);
×
116
            } catch (InterruptedException e) {
×
117
               continue;
×
118
            }
×
119
        }
120

121
        // Launch the periodic timestamp interval allocation. In this case, the timestamp interval is extended even though the TSO is idle.
122
        // Because we are world time based, this guarantees that the first request after a long time does not need to wait for new interval allocation.
123
        scheduler.scheduleAtFixedRate(allocateTimestampsBatchTask, TIMESTAMP_ALLOCATION_INTERVAL_MS, TIMESTAMP_ALLOCATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
20✔
124
    }
20✔
125

126
    /**
127
     * Returns the next timestamp if available. Otherwise spins till the ts-persist thread allocates a new timestamp.
128
     */
129
    @Override
130
    public long next() {
131

132
        long currentMsFirstTimestamp = System.currentTimeMillis() * MAX_TX_PER_MS;
148✔
133

134
        lastTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
148✔
135

136
        // Return the next timestamp in case we are still in the same millisecond as the previous timestamp was. 
137
        if (lastTimestamp >= currentMsFirstTimestamp) {
148✔
138
            return lastTimestamp;
×
139
        }
140

141
        if (currentMsFirstTimestamp >= maxTimestamp) { // Intentional race to reduce synchronization overhead in every access to maxTimestamp                                                                                                                       
148✔
142
            while (maxAllocatedTime <= currentMsFirstTimestamp) { // Waiting for the interval allocation
24✔
143
                try {
144
                    Thread.sleep(1000);
2✔
145
                } catch (InterruptedException e) {
×
146
                   continue;
×
147
                }
2✔
148
            }
149
            assert (maxAllocatedTime > maxTimestamp);
22✔
150
            maxTimestamp = maxAllocatedTime;
22✔
151
        }
152

153
        lastTimestamp = currentMsFirstTimestamp;
148✔
154

155
        return lastTimestamp;
148✔
156
    }
157

158
    @Override
159
    public long getLast() {
160
        return lastTimestamp;
18✔
161
    }
162

163
    @Override
164
    public String toString() {
165
        return String.format("TimestampOracle -> LastTimestamp: %d, MaxTimestamp: %d", lastTimestamp, maxTimestamp);
×
166
    }
167

168
    @VisibleForTesting
169
    static class InMemoryTimestampStorage implements TimestampStorage {
×
170

171
        long maxTime = 0;
×
172

173
        @Override
174
        public void updateMaxTimestamp(long previousMaxTime, long nextMaxTime) {
175
            maxTime = nextMaxTime;
×
176
            LOG.info("Updating max timestamp: (previous:{}, new:{})", previousMaxTime, nextMaxTime);
×
177
        }
×
178

179
        @Override
180
        public long getMaxTimestamp() {
181
            return maxTime;
×
182
        }
183

184
    }
185
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc