• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

box / box-java-sdk / #3921

12 Jul 2024 02:36PM CUT coverage: 72.443% (+0.005%) from 72.438%
#3921

Pull #1257

github

web-flow
Merge bf231fe94 into f08844889
Pull Request #1257: feat: Allow overriding creation of OkHttp Call

2 of 2 new or added lines in 1 file covered. (100.0%)

3 existing lines in 3 files now uncovered.

7684 of 10607 relevant lines covered (72.44%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.46
/src/main/java/com/box/sdk/EventStream.java
1
package com.box.sdk;
2

3
import com.eclipsesource.json.Json;
4
import com.eclipsesource.json.JsonArray;
5
import com.eclipsesource.json.JsonObject;
6
import com.eclipsesource.json.JsonValue;
7
import java.util.ArrayList;
8
import java.util.Collection;
9

10
/**
11
 * Receives real-time events from the API and forwards them to {@link EventListener EventListeners}.
12
 *
13
 * <p>This class handles long polling the Box events endpoint in order to receive real-time user events.
14
 * When an EventStream is started, it begins long polling on a separate thread until the {@link #stop} method
15
 * is called.
16
 * Since the API may return duplicate events, EventStream also maintains a small cache of the most recently received
17
 * event IDs in order to automatically deduplicate events.</p>
18
 * <p>Note: Enterprise Events can be accessed by admin users with the EventLog.getEnterpriseEvents method</p>
19
 */
20
public class EventStream {
1✔
21

22
    private static final int LIMIT = 800;
23
    /**
24
     * Events URL.
25
     */
26
    public static final URLTemplate EVENT_URL = new URLTemplate("events?limit=" + LIMIT + "&stream_position=%s");
1✔
27
    private static final int STREAM_POSITION_NOW = -1;
28
    private static final int DEFAULT_POLLING_DELAY = 1000;
29
    private final BoxAPIConnection api;
30
    private final long startingPosition;
31
    private final int pollingDelay;
32
    private final Collection<EventListener> listeners;
33
    private final Object listenerLock;
34

35
    private LRUCache<String> receivedEvents;
36
    private boolean started;
37
    private Poller poller;
38
    private Thread pollerThread;
39

40
    /**
41
     * Constructs an EventStream using an API connection.
42
     *
43
     * @param api the API connection to use.
44
     */
45
    public EventStream(BoxAPIConnection api) {
46
        this(api, STREAM_POSITION_NOW, DEFAULT_POLLING_DELAY);
1✔
47
    }
1✔
48

49
    /**
50
     * Constructs an EventStream using an API connection and a starting initial position.
51
     *
52
     * @param api              the API connection to use.
53
     * @param startingPosition the starting position of the event stream.
54
     */
55
    public EventStream(BoxAPIConnection api, long startingPosition) {
56
        this(api, startingPosition, DEFAULT_POLLING_DELAY);
×
57
    }
×
58

59
    /**
60
     * Constructs an EventStream using an API connection and a starting initial position with custom polling delay.
61
     *
62
     * @param api              the API connection to use.
63
     * @param startingPosition the starting position of the event stream.
64
     * @param pollingDelay     the delay in milliseconds between successive calls to get more events.
65
     */
66
    public EventStream(BoxAPIConnection api, long startingPosition, int pollingDelay) {
1✔
67
        this.api = api;
1✔
68
        this.startingPosition = startingPosition;
1✔
69
        this.listeners = new ArrayList<>();
1✔
70
        this.listenerLock = new Object();
1✔
71
        this.pollingDelay = pollingDelay;
1✔
72
    }
1✔
73

74
    /**
75
     * Adds a listener that will be notified when an event is received.
76
     *
77
     * @param listener the listener to add.
78
     */
79
    public void addListener(EventListener listener) {
80
        synchronized (this.listenerLock) {
1✔
81
            this.listeners.add(listener);
1✔
82
        }
1✔
83
    }
1✔
84

85
    /**
86
     * Indicates whether or not this EventStream has been started.
87
     *
88
     * @return true if this EventStream has been started; otherwise false.
89
     */
90
    public boolean isStarted() {
91
        return this.started;
1✔
92
    }
93

94
    /**
95
     * Stops this EventStream and disconnects from the API.
96
     *
97
     * @throws IllegalStateException if the EventStream is already stopped.
98
     */
99
    public void stop() {
100
        if (!this.started) {
1✔
101
            throw new IllegalStateException("Cannot stop the EventStream because it isn't started.");
1✔
102
        }
103

104
        this.started = false;
1✔
105
        this.pollerThread.interrupt();
1✔
106
    }
1✔
107

108
    /**
109
     * Starts this EventStream and begins long polling the API.
110
     *
111
     * @throws IllegalStateException if the EventStream is already started.
112
     */
113
    public void start() {
114
        if (this.started) {
1✔
115
            throw new IllegalStateException("Cannot start the EventStream because it isn't stopped.");
×
116
        }
117

118
        final long initialPosition;
119

120
        if (this.startingPosition == STREAM_POSITION_NOW) {
1✔
121
            BoxJSONRequest request = new BoxJSONRequest(this.api,
1✔
122
                EVENT_URL.buildAlpha(this.api.getBaseURL(), "now"), "GET"
1✔
123
            );
124
            try (BoxJSONResponse response = request.send()) {
1✔
125
                JsonObject jsonObject = Json.parse(response.getJSON()).asObject();
1✔
126
                initialPosition = jsonObject.get("next_stream_position").asLong();
1✔
127
            }
128
        } else {
1✔
129
            assert this.startingPosition >= 0 : "Starting position must be non-negative";
×
130
            initialPosition = this.startingPosition;
×
131
        }
132

133
        this.poller = new Poller(initialPosition);
1✔
134

135
        this.pollerThread = new Thread(this.poller);
1✔
136
        this.pollerThread.setUncaughtExceptionHandler((t, e) -> EventStream.this.notifyException(e));
1✔
137
        this.pollerThread.start();
1✔
138

139
        this.started = true;
1✔
140
    }
1✔
141

142
    /**
143
     * Indicates whether or not an event ID is a duplicate.
144
     *
145
     * <p>This method can be overridden by a subclass in order to provide custom de-duping logic.</p>
146
     *
147
     * @param eventID the event ID.
148
     * @return true if the event is a duplicate; otherwise false.
149
     */
150
    protected boolean isDuplicate(String eventID) {
151
        if (this.receivedEvents == null) {
1✔
152
            this.receivedEvents = new LRUCache<>();
1✔
153
        }
154

155
        return !this.receivedEvents.add(eventID);
1✔
156
    }
157

158
    private void notifyNextPosition(long position) {
159
        synchronized (this.listenerLock) {
1✔
160
            for (EventListener listener : this.listeners) {
1✔
161
                listener.onNextPosition(position);
1✔
162
            }
1✔
163
        }
1✔
164
    }
1✔
165

166
    private void notifyEvent(BoxEvent event) {
167
        synchronized (this.listenerLock) {
1✔
168
            boolean isDuplicate = this.isDuplicate(event.getID());
1✔
169
            if (!isDuplicate) {
1✔
170
                for (EventListener listener : this.listeners) {
1✔
171
                    listener.onEvent(event);
1✔
172
                }
1✔
173
            }
174
        }
1✔
175
    }
1✔
176

177
    private void notifyException(Throwable e) {
178
        if (e instanceof InterruptedException && !this.started) {
1✔
179
            return;
×
180
        }
181

182
        this.stop();
1✔
183
        synchronized (this.listenerLock) {
1✔
184
            for (EventListener listener : this.listeners) {
1✔
185
                if (listener.onException(e)) {
1✔
186
                    return;
×
187
                }
188
            }
1✔
189
        }
1✔
190
    }
1✔
191

192
    private class Poller implements Runnable {
193
        private final long initialPosition;
194

195
        private RealtimeServerConnection server;
196

197
        Poller(long initialPosition) {
1✔
198
            this.initialPosition = initialPosition;
1✔
199
            this.server = new RealtimeServerConnection(EventStream.this.api);
1✔
200
        }
1✔
201

202
        @Override
203
        public void run() {
204
            long position = this.initialPosition;
1✔
205
            while (!Thread.interrupted()) {
1✔
206
                if (this.server.getRemainingRetries() == 0) {
1✔
207
                    this.server = new RealtimeServerConnection(EventStream.this.api);
×
208
                }
209

210
                if (this.server.waitForChange(position)) {
1✔
211
                    if (Thread.interrupted()) {
1✔
212
                        return;
×
213
                    }
214

215
                    BoxJSONRequest request = new BoxJSONRequest(EventStream.this.api,
1✔
216
                        EVENT_URL.buildAlpha(EventStream.this.api.getBaseURL(), position), "GET"
1✔
217
                    );
218
                    try (BoxJSONResponse response = request.send()) {
1✔
219
                        JsonObject jsonObject = Json.parse(response.getJSON()).asObject();
1✔
220
                        JsonArray entriesArray = jsonObject.get("entries").asArray();
1✔
221
                        for (JsonValue entry : entriesArray) {
1✔
222
                            BoxEvent event = new BoxEvent(EventStream.this.api, entry.asObject());
1✔
223
                            EventStream.this.notifyEvent(event);
1✔
224
                        }
1✔
225
                        position = jsonObject.get("next_stream_position").asLong();
1✔
226
                        EventStream.this.notifyNextPosition(position);
1✔
227
                        try {
228
                            // Delay re-polling to avoid making too many API calls
229
                            // Since duplicate events may appear in the stream, without any delay added
230
                            // the stream can make 3-5 requests per second and not produce any new
231
                            // events.  A short delay between calls balances latency for new events
232
                            // and the risk of hitting rate limits.
233
                            Thread.sleep(EventStream.this.pollingDelay);
1✔
234
                        } catch (InterruptedException ex) {
×
235
                            return;
×
236
                        }
1✔
UNCOV
237
                    }
×
238
                }
1✔
239
            }
240
        }
×
241
    }
242
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc