• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

box / box-java-sdk / #5935

18 Dec 2025 06:48PM UTC coverage: 12.907% (+0.01%) from 12.895%
#5935

Pull #1651

github

web-flow
Merge d94e46200 into 492684e45
Pull Request #1651: test(boxsdkgen): Update Metadata Taxonomies tests (box/box-codegen#909)

0 of 8 new or added lines in 3 files covered. (0.0%)

17 existing lines in 10 files now uncovered.

8374 of 64880 relevant lines covered (12.91%)

0.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.46
/src/main/java/com/box/sdk/EventStream.java
1
package com.box.sdk;
2

3
import com.eclipsesource.json.Json;
4
import com.eclipsesource.json.JsonArray;
5
import com.eclipsesource.json.JsonObject;
6
import com.eclipsesource.json.JsonValue;
7
import java.util.ArrayList;
8
import java.util.Collection;
9

10
/**
11
 * Receives real-time events from the API and forwards them to {@link EventListener EventListeners}.
12
 *
13
 * <p>This class handles long polling the Box events endpoint in order to receive real-time user
14
 * events. When an EventStream is started, it begins long polling on a separate thread until the
15
 * {@link #stop} method is called. Since the API may return duplicate events, EventStream also
16
 * maintains a small cache of the most recently received event IDs in order to automatically
17
 * deduplicate events.
18
 *
19
 * <p>Note: Enterprise Events can be accessed by admin users with the EventLog.getEnterpriseEvents
20
 * method
21
 */
22
public class EventStream {
23

24
  private static final int LIMIT = 800;
25
  /** Events URL. */
26
  public static final URLTemplate EVENT_URL =
1✔
27
      new URLTemplate("events?limit=" + LIMIT + "&stream_position=%s");
28

29
  private static final int STREAM_POSITION_NOW = -1;
30
  private static final int DEFAULT_POLLING_DELAY = 1000;
31
  private final BoxAPIConnection api;
32
  private final long startingPosition;
33
  private final int pollingDelay;
34
  private final Collection<EventListener> listeners;
35
  private final Object listenerLock;
36

37
  private LRUCache<String> receivedEvents;
38
  private boolean started;
39
  private Poller poller;
40
  private Thread pollerThread;
41

42
  /**
43
   * Constructs an EventStream using an API connection.
44
   *
45
   * @param api the API connection to use.
46
   */
47
  public EventStream(BoxAPIConnection api) {
48
    this(api, STREAM_POSITION_NOW, DEFAULT_POLLING_DELAY);
1✔
49
  }
1✔
50

51
  /**
52
   * Constructs an EventStream using an API connection and a starting initial position.
53
   *
54
   * @param api the API connection to use.
55
   * @param startingPosition the starting position of the event stream.
56
   */
57
  public EventStream(BoxAPIConnection api, long startingPosition) {
58
    this(api, startingPosition, DEFAULT_POLLING_DELAY);
×
59
  }
×
60

61
  /**
62
   * Constructs an EventStream using an API connection and a starting initial position with custom
63
   * polling delay.
64
   *
65
   * @param api the API connection to use.
66
   * @param startingPosition the starting position of the event stream.
67
   * @param pollingDelay the delay in milliseconds between successive calls to get more events.
68
   */
69
  public EventStream(BoxAPIConnection api, long startingPosition, int pollingDelay) {
1✔
70
    this.api = api;
1✔
71
    this.startingPosition = startingPosition;
1✔
72
    this.listeners = new ArrayList<>();
1✔
73
    this.listenerLock = new Object();
1✔
74
    this.pollingDelay = pollingDelay;
1✔
75
  }
1✔
76

77
  /**
78
   * Adds a listener that will be notified when an event is received.
79
   *
80
   * @param listener the listener to add.
81
   */
82
  public void addListener(EventListener listener) {
83
    synchronized (this.listenerLock) {
1✔
84
      this.listeners.add(listener);
1✔
85
    }
1✔
86
  }
1✔
87

88
  /**
89
   * Indicates whether or not this EventStream has been started.
90
   *
91
   * @return true if this EventStream has been started; otherwise false.
92
   */
93
  public boolean isStarted() {
94
    return this.started;
1✔
95
  }
96

97
  /**
98
   * Stops this EventStream and disconnects from the API.
99
   *
100
   * @throws IllegalStateException if the EventStream is already stopped.
101
   */
102
  public void stop() {
103
    if (!this.started) {
1✔
104
      throw new IllegalStateException("Cannot stop the EventStream because it isn't started.");
1✔
105
    }
106

107
    this.started = false;
1✔
108
    this.pollerThread.interrupt();
1✔
109
  }
1✔
110

111
  /**
112
   * Starts this EventStream and begins long polling the API.
113
   *
114
   * @throws IllegalStateException if the EventStream is already started.
115
   */
116
  public void start() {
117
    if (this.started) {
1✔
118
      throw new IllegalStateException("Cannot start the EventStream because it isn't stopped.");
×
119
    }
120

121
    final long initialPosition;
122

123
    if (this.startingPosition == STREAM_POSITION_NOW) {
1✔
124
      BoxJSONRequest request =
1✔
125
          new BoxJSONRequest(this.api, EVENT_URL.buildAlpha(this.api.getBaseURL(), "now"), "GET");
1✔
126
      try (BoxJSONResponse response = request.send()) {
1✔
127
        JsonObject jsonObject = Json.parse(response.getJSON()).asObject();
1✔
128
        initialPosition = jsonObject.get("next_stream_position").asLong();
1✔
129
      }
130
    } else {
1✔
131
      assert this.startingPosition >= 0 : "Starting position must be non-negative";
×
132
      initialPosition = this.startingPosition;
×
133
    }
134

135
    this.poller = new Poller(initialPosition);
1✔
136

137
    this.pollerThread = new Thread(this.poller);
1✔
138
    this.pollerThread.setUncaughtExceptionHandler((t, e) -> EventStream.this.notifyException(e));
1✔
139
    this.pollerThread.start();
1✔
140

141
    this.started = true;
1✔
142
  }
1✔
143

144
  /**
145
   * Indicates whether or not an event ID is a duplicate.
146
   *
147
   * <p>This method can be overridden by a subclass in order to provide custom de-duping logic.
148
   *
149
   * @param eventID the event ID.
150
   * @return true if the event is a duplicate; otherwise false.
151
   */
152
  protected boolean isDuplicate(String eventID) {
153
    if (this.receivedEvents == null) {
1✔
154
      this.receivedEvents = new LRUCache<>();
1✔
155
    }
156

157
    return !this.receivedEvents.add(eventID);
1✔
158
  }
159

160
  private void notifyNextPosition(long position) {
161
    synchronized (this.listenerLock) {
1✔
162
      for (EventListener listener : this.listeners) {
1✔
163
        listener.onNextPosition(position);
1✔
164
      }
1✔
165
    }
1✔
166
  }
1✔
167

168
  private void notifyEvent(BoxEvent event) {
169
    synchronized (this.listenerLock) {
1✔
170
      boolean isDuplicate = this.isDuplicate(event.getID());
1✔
171
      if (!isDuplicate) {
1✔
172
        for (EventListener listener : this.listeners) {
1✔
173
          listener.onEvent(event);
1✔
174
        }
1✔
175
      }
176
    }
1✔
177
  }
1✔
178

179
  private void notifyException(Throwable e) {
180
    if (e instanceof InterruptedException && !this.started) {
1✔
181
      return;
×
182
    }
183

184
    this.stop();
1✔
185
    synchronized (this.listenerLock) {
1✔
186
      for (EventListener listener : this.listeners) {
1✔
187
        if (listener.onException(e)) {
1✔
188
          return;
×
189
        }
190
      }
1✔
191
    }
1✔
192
  }
1✔
193

194
  private class Poller implements Runnable {
195
    private final long initialPosition;
196

197
    private RealtimeServerConnection server;
198

199
    Poller(long initialPosition) {
1✔
200
      this.initialPosition = initialPosition;
1✔
201
      this.server = new RealtimeServerConnection(EventStream.this.api);
1✔
202
    }
1✔
203

204
    @Override
205
    public void run() {
206
      long position = this.initialPosition;
1✔
207
      while (!Thread.interrupted()) {
1✔
208
        if (this.server.getRemainingRetries() == 0) {
1✔
209
          this.server = new RealtimeServerConnection(EventStream.this.api);
×
210
        }
211

212
        if (this.server.waitForChange(position)) {
1✔
213
          if (Thread.interrupted()) {
1✔
214
            return;
×
215
          }
216

217
          BoxJSONRequest request =
1✔
218
              new BoxJSONRequest(
219
                  EventStream.this.api,
1✔
220
                  EVENT_URL.buildAlpha(EventStream.this.api.getBaseURL(), position),
1✔
221
                  "GET");
222
          try (BoxJSONResponse response = request.send()) {
1✔
223
            JsonObject jsonObject = Json.parse(response.getJSON()).asObject();
1✔
224
            JsonArray entriesArray = jsonObject.get("entries").asArray();
1✔
225
            for (JsonValue entry : entriesArray) {
1✔
226
              BoxEvent event = new BoxEvent(EventStream.this.api, entry.asObject());
1✔
227
              EventStream.this.notifyEvent(event);
1✔
228
            }
1✔
229
            position = jsonObject.get("next_stream_position").asLong();
1✔
230
            EventStream.this.notifyNextPosition(position);
1✔
231
            try {
232
              // Delay re-polling to avoid making too many API calls
233
              // Since duplicate events may appear in the stream, without any delay added
234
              // the stream can make 3-5 requests per second and not produce any new
235
              // events.  A short delay between calls balances latency for new events
236
              // and the risk of hitting rate limits.
237
              Thread.sleep(EventStream.this.pollingDelay);
1✔
238
            } catch (InterruptedException ex) {
×
239
              return;
×
240
            }
1✔
UNCOV
241
          }
×
242
        }
1✔
243
      }
244
    }
×
245
  }
246
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc