• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

box / box-java-sdk / #4766

04 Sep 2025 02:54PM UTC coverage: 37.239% (-1.6%) from 38.858%
#4766

push

github

web-flow
feat: Support event with long polling (box/box-codegen#807) (#1409)

32 of 420 new or added lines in 7 files covered. (7.62%)

800 existing lines in 66 files now uncovered.

18480 of 49626 relevant lines covered (37.24%)

0.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.57
/src/main/java/com/box/sdkgen/box/eventstream/EventStreamIterator.java
1
package com.box.sdkgen.box.eventstream;
2

3
import com.box.sdkgen.box.errors.BoxSDKError;
4
import com.box.sdkgen.managers.events.EventsManager;
5
import com.box.sdkgen.managers.events.GetEventStreamHeaders;
6
import com.box.sdkgen.managers.events.GetEventStreamQueryParams;
7
import com.box.sdkgen.managers.events.GetEventsHeaders;
8
import com.box.sdkgen.managers.events.GetEventsQueryParams;
9
import com.box.sdkgen.managers.events.GetEventsQueryParamsEventTypeField;
10
import com.box.sdkgen.managers.events.GetEventsQueryParamsStreamTypeField;
11
import com.box.sdkgen.networking.fetchoptions.FetchOptions;
12
import com.box.sdkgen.networking.fetchoptions.ResponseFormat;
13
import com.box.sdkgen.networking.fetchresponse.FetchResponse;
14
import com.box.sdkgen.schemas.event.Event;
15
import com.box.sdkgen.schemas.events.Events;
16
import com.box.sdkgen.schemas.realtimeserver.RealtimeServer;
17
import com.box.sdkgen.schemas.realtimeservers.RealtimeServers;
18
import com.box.sdkgen.serialization.json.Valuable;
19
import com.fasterxml.jackson.databind.JsonNode;
20
import java.util.ArrayList;
21
import java.util.HashMap;
22
import java.util.Iterator;
23
import java.util.List;
24
import java.util.Map;
25
import java.util.NoSuchElementException;
26
import java.util.concurrent.CountDownLatch;
27
import java.util.stream.Collectors;
28

NEW
29
enum RealtimeServerEvent {
×
NEW
30
  NEW_CHANGE("new_change"),
×
NEW
31
  RECONNECT("reconnect");
×
32

33
  private final String value;
34

NEW
35
  RealtimeServerEvent(String value) {
×
NEW
36
    this.value = value;
×
NEW
37
  }
×
38

39
  public String getValue() {
NEW
40
    return value;
×
41
  }
42
}
43

NEW
44
enum EventStreamAction {
×
NEW
45
  FETCH_EVENTS,
×
NEW
46
  RECONNECT,
×
NEW
47
  RETRY,
×
NEW
48
  STOP
×
49
}
50

51
class EventStreamIterator implements Iterator<Event> {
52
  private static final int DEDUPLICATION_SIZE = 1000;
53
  private final EventsManager eventsManager;
54
  private final GetEventStreamQueryParams queryParams;
55
  private final GetEventStreamHeaders headersInput;
56
  private String streamPosition;
57
  private RealtimeServer longPollInfo;
58
  private volatile boolean stopped;
59
  private final CountDownLatch stopLatch;
60
  private final Map<String, Boolean> dedupHash;
61
  private final List<Event> eventQueue;
62
  private boolean started;
63
  private int longPollingRetries;
64
  private static final int DEFAULT_MAX_RETRIES = 10;
65

66
  EventStreamIterator(
67
      EventsManager eventsManager,
68
      GetEventStreamQueryParams queryParams,
69
      GetEventStreamHeaders headersInput) {
1✔
70
    this.eventsManager = eventsManager;
1✔
71
    this.queryParams = queryParams;
1✔
72
    this.headersInput = headersInput;
1✔
73
    this.streamPosition =
1✔
74
        queryParams.getStreamPosition() != null ? queryParams.getStreamPosition() : "now";
1✔
75
    this.longPollInfo = null;
1✔
76
    this.started = false;
1✔
77
    this.stopped = false;
1✔
78
    this.stopLatch = new CountDownLatch(1);
1✔
79
    this.dedupHash = new HashMap<>();
1✔
80
    this.eventQueue = new ArrayList<>();
1✔
81
    this.longPollingRetries = 0;
1✔
82
  }
1✔
83

84
  @Override
85
  public boolean hasNext() {
NEW
86
    return !eventQueue.isEmpty() || (!stopped && stopLatch.getCount() > 0);
×
87
  }
88

89
  @Override
90
  public Event next() {
NEW
91
    if (!hasNext()) {
×
NEW
92
      stop();
×
NEW
93
      throw new NoSuchElementException("No more events available or stream stopped");
×
94
    }
NEW
95
    if (!eventQueue.isEmpty()) {
×
NEW
96
      return eventQueue.remove(0);
×
97
    }
NEW
98
    if (!started) {
×
NEW
99
      started = true;
×
NEW
100
      fetchEvents();
×
NEW
101
      if (!eventQueue.isEmpty()) {
×
NEW
102
        return eventQueue.remove(0);
×
103
      }
104
    }
105

NEW
106
    getLongPollInfo();
×
NEW
107
    while (!stopped && stopLatch.getCount() > 0) {
×
108
      try {
NEW
109
        EventStreamAction action = doLongPoll();
×
NEW
110
        switch (action) {
×
111
          case FETCH_EVENTS:
NEW
112
            fetchEvents();
×
NEW
113
            if (!eventQueue.isEmpty()) {
×
NEW
114
              return eventQueue.remove(0);
×
115
            }
116
            break;
117
          case RETRY:
118
            try {
NEW
119
              Thread.sleep(5000);
×
NEW
120
            } catch (InterruptedException ie) {
×
NEW
121
              Thread.currentThread().interrupt();
×
NEW
122
            }
×
NEW
123
            continue;
×
124
          case RECONNECT:
NEW
125
            getLongPollInfo();
×
NEW
126
            continue;
×
127
          case STOP:
NEW
128
            stop();
×
129
            break;
130
        }
NEW
131
      } catch (Exception e) {
×
NEW
132
        boolean isBoxError = e instanceof BoxSDKError;
×
NEW
133
        if (!isBoxError) {
×
NEW
134
          throw e;
×
135
        }
NEW
136
      }
×
137
    }
NEW
138
    throw new NoSuchElementException("Stream stopped");
×
139
  }
140

141
  public void stop() {
NEW
142
    if (!stopped) {
×
NEW
143
      this.stopped = true;
×
NEW
144
      this.stopLatch.countDown();
×
145
    }
NEW
146
  }
×
147

148
  private void getLongPollInfo() throws BoxSDKError {
NEW
149
    if (stopped || stopLatch.getCount() == 0) {
×
NEW
150
      return;
×
151
    }
152

153
    try {
NEW
154
      RealtimeServers info = eventsManager.getEventsWithLongPolling();
×
NEW
155
      RealtimeServer server = null;
×
NEW
156
      if (info.getEntries() != null) {
×
NEW
157
        for (RealtimeServer entry : info.getEntries()) {
×
NEW
158
          if (entry.getType().equals("realtime_server")) {
×
NEW
159
            server = entry;
×
NEW
160
            break;
×
161
          }
NEW
162
        }
×
163
      }
164

NEW
165
      if (server == null) {
×
NEW
166
        throw new BoxSDKError("No realtime server found in the response.");
×
167
      }
168

NEW
169
      this.longPollInfo = server;
×
NEW
170
      this.longPollingRetries = 0;
×
171

NEW
172
    } catch (Exception e) {
×
NEW
173
      if (!stopped && stopLatch.getCount() > 0) {
×
NEW
174
        throw new BoxSDKError(
×
NEW
175
            "Failed to fetch long polling info: " + (e.getMessage() != null ? e.getMessage() : ""),
×
176
            e);
177
      }
NEW
178
    }
×
NEW
179
  }
×
180

181
  private EventStreamAction doLongPoll() throws BoxSDKError {
NEW
182
    if (stopped || stopLatch.getCount() == 0) {
×
NEW
183
      return EventStreamAction.STOP;
×
184
    }
185

186
    try {
NEW
187
      int maxRetries =
×
NEW
188
          (longPollInfo != null && longPollInfo.getMaxRetries() != null)
×
NEW
189
              ? Integer.parseInt(longPollInfo.getMaxRetries())
×
190
              : DEFAULT_MAX_RETRIES;
191

NEW
192
      if (longPollInfo == null || longPollingRetries > maxRetries) {
×
NEW
193
        getLongPollInfo();
×
194
      }
195

NEW
196
      longPollingRetries++;
×
197

NEW
198
      String longPollUrl = longPollInfo.getUrl();
×
NEW
199
      String separator = longPollUrl.contains("?") ? "&" : "?";
×
NEW
200
      String longPollWithStreamPosition =
×
201
          longPollUrl + separator + "stream_position=" + streamPosition;
NEW
202
      Map<String, String> headers = new HashMap<>();
×
NEW
203
      headers.put("Content-Type", "application/json");
×
204

NEW
205
      FetchResponse response =
×
206
          eventsManager
NEW
207
              .getNetworkSession()
×
NEW
208
              .getNetworkClient()
×
NEW
209
              .fetch(
×
210
                  new FetchOptions.Builder(longPollWithStreamPosition, "GET")
NEW
211
                      .responseFormat(ResponseFormat.JSON)
×
NEW
212
                      .auth(eventsManager.getAuth())
×
NEW
213
                      .networkSession(eventsManager.getNetworkSession())
×
NEW
214
                      .headers(headers)
×
NEW
215
                      .build());
×
216

NEW
217
      if (stopped || stopLatch.getCount() == 0) {
×
NEW
218
        return EventStreamAction.STOP;
×
219
      }
220

NEW
221
      if (response.getStatus() == 200 && response.getData() != null) {
×
NEW
222
        JsonNode message = response.getData();
×
NEW
223
        String messageText = message.has("message") ? message.get("message").asText() : null;
×
224

NEW
225
        if (RealtimeServerEvent.NEW_CHANGE.getValue().equals(messageText)) {
×
NEW
226
          return EventStreamAction.FETCH_EVENTS;
×
NEW
227
        } else if (RealtimeServerEvent.RECONNECT.getValue().equals(messageText)) {
×
NEW
228
          return EventStreamAction.RECONNECT;
×
229
        }
230
      }
NEW
231
      return EventStreamAction.RETRY;
×
NEW
232
    } catch (Exception e) {
×
NEW
233
      if (!stopped && stopLatch.getCount() > 0) {
×
NEW
234
        return EventStreamAction.RETRY;
×
235
      }
NEW
236
      return EventStreamAction.STOP;
×
237
    }
238
  }
239

240
  private void fetchEvents() throws BoxSDKError {
NEW
241
    if (stopped || stopLatch.getCount() == 0) {
×
NEW
242
      return;
×
243
    }
244

245
    try {
NEW
246
      GetEventsQueryParams.Builder fetchParamsBuilder =
×
NEW
247
          new GetEventsQueryParams.Builder().streamPosition(streamPosition);
×
248

NEW
249
      if (queryParams.getLimit() != null) {
×
NEW
250
        fetchParamsBuilder.limit(queryParams.getLimit());
×
251
      }
NEW
252
      if (queryParams.getCreatedAfter() != null) {
×
NEW
253
        fetchParamsBuilder.createdAfter(queryParams.getCreatedAfter());
×
254
      }
NEW
255
      if (queryParams.getCreatedBefore() != null) {
×
NEW
256
        fetchParamsBuilder.createdBefore(queryParams.getCreatedBefore());
×
257
      }
258

NEW
259
      if (queryParams.getStreamType() != null) {
×
NEW
260
        String streamTypeValue = queryParams.getStreamType().getValue();
×
NEW
261
        GetEventsQueryParamsStreamTypeField streamType = convertStreamType(streamTypeValue);
×
NEW
262
        fetchParamsBuilder.streamType(streamType);
×
263
      }
264

NEW
265
      List<? extends Valuable> eventTypes =
×
NEW
266
          queryParams.getEventType() != null
×
NEW
267
              ? queryParams.getEventType().stream()
×
NEW
268
                  .map(enumWrapper -> convertEventType(enumWrapper.getValue()))
×
NEW
269
                  .collect(Collectors.toList())
×
270
              : null;
NEW
271
      if (eventTypes != null) {
×
NEW
272
        fetchParamsBuilder.eventType(eventTypes);
×
273
      }
274

NEW
275
      GetEventsQueryParams fetchParams = fetchParamsBuilder.build();
×
NEW
276
      GetEventsHeaders fetchHeaders =
×
277
          new GetEventsHeaders.Builder()
NEW
278
              .extraHeaders(
×
NEW
279
                  headersInput.getExtraHeaders() != null
×
NEW
280
                      ? headersInput.getExtraHeaders()
×
281
                      : new HashMap<>())
NEW
282
              .build();
×
283

NEW
284
      Events events = eventsManager.getEvents(fetchParams, fetchHeaders);
×
NEW
285
      if (events.getEntries() != null && !events.getEntries().isEmpty()) {
×
NEW
286
        for (Event event : events.getEntries()) {
×
NEW
287
          String eventId = event.getEventId();
×
NEW
288
          if (eventId != null && !dedupHash.containsKey(eventId)) {
×
NEW
289
            dedupHash.put(eventId, true);
×
NEW
290
            if (stopped || stopLatch.getCount() == 0) {
×
NEW
291
              return;
×
292
            }
NEW
293
            eventQueue.add(event);
×
294
          }
NEW
295
        }
×
296
      }
297

NEW
298
      streamPosition =
×
NEW
299
          events.getNextStreamPosition() != null
×
NEW
300
              ? (events.getNextStreamPosition().isString()
×
NEW
301
                  ? events.getNextStreamPosition().getString()
×
NEW
302
                  : String.valueOf(events.getNextStreamPosition().getLongNumber()))
×
303
              : "now";
304

NEW
305
      if (dedupHash.size() >= DEDUPLICATION_SIZE) {
×
NEW
306
        dedupHash.clear();
×
NEW
307
        for (Event event : events.getEntries()) {
×
NEW
308
          String eventId = event.getEventId();
×
NEW
309
          if (eventId != null) {
×
NEW
310
            dedupHash.put(eventId, true);
×
311
          }
NEW
312
        }
×
313
      }
NEW
314
    } catch (Exception e) {
×
NEW
315
      if (!stopped && stopLatch.getCount() > 0) {
×
NEW
316
        if (e instanceof BoxSDKError) {
×
NEW
317
          throw (BoxSDKError) e;
×
318
        } else {
NEW
319
          throw new BoxSDKError(
×
NEW
320
              "Failed to fetch events: " + (e.getMessage() != null ? e.getMessage() : ""), e);
×
321
        }
322
      }
NEW
323
    }
×
NEW
324
  }
×
325

326
  private GetEventsQueryParamsStreamTypeField convertStreamType(String streamTypeValue) {
NEW
327
    if (streamTypeValue == null) return null;
×
328
    try {
NEW
329
      return GetEventsQueryParamsStreamTypeField.valueOf(
×
NEW
330
          streamTypeValue.toUpperCase().replace(" ", "_"));
×
NEW
331
    } catch (IllegalArgumentException e) {
×
NEW
332
      throw new BoxSDKError("Invalid stream type: " + streamTypeValue);
×
333
    }
334
  }
335

336
  private Valuable convertEventType(String eventTypeValue) {
NEW
337
    if (eventTypeValue == null) return null;
×
338
    try {
NEW
339
      return GetEventsQueryParamsEventTypeField.valueOf(
×
NEW
340
          eventTypeValue.toUpperCase().replace(" ", "_"));
×
NEW
341
    } catch (IllegalArgumentException e) {
×
NEW
342
      throw new BoxSDKError("Invalid event type: " + eventTypeValue);
×
343
    }
344
  }
345
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc