• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

box / box-java-sdk / #5879

17 Dec 2025 04:15PM UTC coverage: 12.895% (-0.008%) from 12.903%
#5879

Pull #1641

github

web-flow
Merge 9a2ba2654 into e14e97bb7
Pull Request #1641: fix(boxsdkgen): replace internal links with absolute links (box/box-openapi#570)

8368 of 64891 relevant lines covered (12.9%)

0.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/box/sdkgen/box/eventstream/EventStreamIterator.java
1
package com.box.sdkgen.box.eventstream;
2

3
import com.box.sdkgen.box.errors.BoxSDKError;
4
import com.box.sdkgen.managers.events.EventsManager;
5
import com.box.sdkgen.managers.events.GetEventStreamHeaders;
6
import com.box.sdkgen.managers.events.GetEventStreamQueryParams;
7
import com.box.sdkgen.managers.events.GetEventsHeaders;
8
import com.box.sdkgen.managers.events.GetEventsQueryParams;
9
import com.box.sdkgen.managers.events.GetEventsQueryParamsEventTypeField;
10
import com.box.sdkgen.managers.events.GetEventsQueryParamsStreamTypeField;
11
import com.box.sdkgen.networking.fetchoptions.FetchOptions;
12
import com.box.sdkgen.networking.fetchoptions.ResponseFormat;
13
import com.box.sdkgen.networking.fetchresponse.FetchResponse;
14
import com.box.sdkgen.schemas.event.Event;
15
import com.box.sdkgen.schemas.events.Events;
16
import com.box.sdkgen.schemas.realtimeserver.RealtimeServer;
17
import com.box.sdkgen.schemas.realtimeservers.RealtimeServers;
18
import com.box.sdkgen.serialization.json.Valuable;
19
import com.fasterxml.jackson.databind.JsonNode;
20
import java.util.ArrayList;
21
import java.util.HashMap;
22
import java.util.Iterator;
23
import java.util.List;
24
import java.util.Map;
25
import java.util.NoSuchElementException;
26
import java.util.concurrent.CountDownLatch;
27
import java.util.stream.Collectors;
28

29
enum RealtimeServerEvent {
×
30
  NEW_CHANGE("new_change"),
×
31
  RECONNECT("reconnect");
×
32

33
  private final String value;
34

35
  RealtimeServerEvent(String value) {
×
36
    this.value = value;
×
37
  }
×
38

39
  public String getValue() {
40
    return value;
×
41
  }
42
}
43

44
enum EventStreamAction {
×
45
  FETCH_EVENTS,
×
46
  RECONNECT,
×
47
  RETRY,
×
48
  STOP
×
49
}
50

51
class EventStreamIterator implements Iterator<Event> {
52
  private static final int DEDUPLICATION_SIZE = 1000;
53
  private final EventsManager eventsManager;
54
  private final GetEventStreamQueryParams queryParams;
55
  private final GetEventStreamHeaders headersInput;
56
  private String streamPosition;
57
  private RealtimeServer longPollInfo;
58
  private volatile boolean stopped;
59
  private final CountDownLatch stopLatch;
60
  private final Map<String, Boolean> dedupHash;
61
  private final List<Event> eventQueue;
62
  private boolean started;
63
  private int longPollingRetries;
64
  private static final int DEFAULT_MAX_RETRIES = 10;
65

66
  EventStreamIterator(
67
      EventsManager eventsManager,
68
      GetEventStreamQueryParams queryParams,
69
      GetEventStreamHeaders headersInput) {
×
70
    this.eventsManager = eventsManager;
×
71
    this.queryParams = queryParams;
×
72
    this.headersInput = headersInput;
×
73
    this.streamPosition =
×
74
        queryParams.getStreamPosition() != null ? queryParams.getStreamPosition() : "now";
×
75
    this.longPollInfo = null;
×
76
    this.started = false;
×
77
    this.stopped = false;
×
78
    this.stopLatch = new CountDownLatch(1);
×
79
    this.dedupHash = new HashMap<>();
×
80
    this.eventQueue = new ArrayList<>();
×
81
    this.longPollingRetries = 0;
×
82
  }
×
83

84
  @Override
85
  public boolean hasNext() {
86
    return !eventQueue.isEmpty() || (!stopped && stopLatch.getCount() > 0);
×
87
  }
88

89
  @Override
90
  public Event next() {
91
    if (!hasNext()) {
×
92
      stop();
×
93
      throw new NoSuchElementException("No more events available or stream stopped");
×
94
    }
95
    if (!eventQueue.isEmpty()) {
×
96
      return eventQueue.remove(0);
×
97
    }
98
    if (!started) {
×
99
      started = true;
×
100
      fetchEvents();
×
101
      if (!eventQueue.isEmpty()) {
×
102
        return eventQueue.remove(0);
×
103
      }
104
    }
105

106
    getLongPollInfo();
×
107
    while (!stopped && stopLatch.getCount() > 0) {
×
108
      try {
109
        EventStreamAction action = doLongPoll();
×
110
        switch (action) {
×
111
          case FETCH_EVENTS:
112
            fetchEvents();
×
113
            if (!eventQueue.isEmpty()) {
×
114
              return eventQueue.remove(0);
×
115
            }
116
            break;
117
          case RETRY:
118
            try {
119
              Thread.sleep(5000);
×
120
            } catch (InterruptedException ie) {
×
121
              Thread.currentThread().interrupt();
×
122
            }
×
123
            continue;
×
124
          case RECONNECT:
125
            getLongPollInfo();
×
126
            continue;
×
127
          case STOP:
128
            stop();
×
129
            break;
130
        }
131
      } catch (Exception e) {
×
132
        boolean isBoxError = e instanceof BoxSDKError;
×
133
        if (!isBoxError) {
×
134
          throw e;
×
135
        }
136
      }
×
137
    }
138
    throw new NoSuchElementException("Stream stopped");
×
139
  }
140

141
  public void stop() {
142
    if (!stopped) {
×
143
      this.stopped = true;
×
144
      this.stopLatch.countDown();
×
145
    }
146
  }
×
147

148
  private void getLongPollInfo() throws BoxSDKError {
149
    if (stopped || stopLatch.getCount() == 0) {
×
150
      return;
×
151
    }
152

153
    try {
154
      RealtimeServers info = eventsManager.getEventsWithLongPolling();
×
155
      RealtimeServer server = null;
×
156
      if (info.getEntries() != null) {
×
157
        for (RealtimeServer entry : info.getEntries()) {
×
158
          if (entry.getType().equals("realtime_server")) {
×
159
            server = entry;
×
160
            break;
×
161
          }
162
        }
×
163
      }
164

165
      if (server == null) {
×
166
        throw new BoxSDKError("No realtime server found in the response.");
×
167
      }
168

169
      this.longPollInfo = server;
×
170
      this.longPollingRetries = 0;
×
171

172
    } catch (Exception e) {
×
173
      if (!stopped && stopLatch.getCount() > 0) {
×
174
        throw new BoxSDKError(
×
175
            "Failed to fetch long polling info: " + (e.getMessage() != null ? e.getMessage() : ""),
×
176
            e);
177
      }
178
    }
×
179
  }
×
180

181
  private EventStreamAction doLongPoll() throws BoxSDKError {
182
    if (stopped || stopLatch.getCount() == 0) {
×
183
      return EventStreamAction.STOP;
×
184
    }
185

186
    try {
187
      int maxRetries =
×
188
          (longPollInfo != null && longPollInfo.getMaxRetries() != null)
×
189
              ? Integer.parseInt(longPollInfo.getMaxRetries())
×
190
              : DEFAULT_MAX_RETRIES;
191

192
      if (longPollInfo == null || longPollingRetries > maxRetries) {
×
193
        getLongPollInfo();
×
194
      }
195

196
      longPollingRetries++;
×
197

198
      String longPollUrl = longPollInfo.getUrl();
×
199
      String separator = longPollUrl.contains("?") ? "&" : "?";
×
200
      String longPollWithStreamPosition =
×
201
          longPollUrl + separator + "stream_position=" + streamPosition;
202
      Map<String, String> headers = new HashMap<>();
×
203
      headers.put("Content-Type", "application/json");
×
204

205
      FetchResponse response =
×
206
          eventsManager
207
              .getNetworkSession()
×
208
              .getNetworkClient()
×
209
              .fetch(
×
210
                  new FetchOptions.Builder(longPollWithStreamPosition, "GET")
211
                      .responseFormat(ResponseFormat.JSON)
×
212
                      .auth(eventsManager.getAuth())
×
213
                      .networkSession(eventsManager.getNetworkSession())
×
214
                      .headers(headers)
×
215
                      .build());
×
216

217
      if (stopped || stopLatch.getCount() == 0) {
×
218
        return EventStreamAction.STOP;
×
219
      }
220

221
      if (response.getStatus() == 200 && response.getData() != null) {
×
222
        JsonNode message = response.getData();
×
223
        String messageText = message.has("message") ? message.get("message").asText() : null;
×
224

225
        if (RealtimeServerEvent.NEW_CHANGE.getValue().equals(messageText)) {
×
226
          return EventStreamAction.FETCH_EVENTS;
×
227
        } else if (RealtimeServerEvent.RECONNECT.getValue().equals(messageText)) {
×
228
          return EventStreamAction.RECONNECT;
×
229
        }
230
      }
231
      return EventStreamAction.RETRY;
×
232
    } catch (Exception e) {
×
233
      if (!stopped && stopLatch.getCount() > 0) {
×
234
        return EventStreamAction.RETRY;
×
235
      }
236
      return EventStreamAction.STOP;
×
237
    }
238
  }
239

240
  private void fetchEvents() throws BoxSDKError {
241
    if (stopped || stopLatch.getCount() == 0) {
×
242
      return;
×
243
    }
244

245
    try {
246
      GetEventsQueryParams.Builder fetchParamsBuilder =
×
247
          new GetEventsQueryParams.Builder().streamPosition(streamPosition);
×
248

249
      if (queryParams.getLimit() != null) {
×
250
        fetchParamsBuilder.limit(queryParams.getLimit());
×
251
      }
252
      if (queryParams.getCreatedAfter() != null) {
×
253
        fetchParamsBuilder.createdAfter(queryParams.getCreatedAfter());
×
254
      }
255
      if (queryParams.getCreatedBefore() != null) {
×
256
        fetchParamsBuilder.createdBefore(queryParams.getCreatedBefore());
×
257
      }
258

259
      if (queryParams.getStreamType() != null) {
×
260
        String streamTypeValue = queryParams.getStreamType().getValue();
×
261
        GetEventsQueryParamsStreamTypeField streamType = convertStreamType(streamTypeValue);
×
262
        fetchParamsBuilder.streamType(streamType);
×
263
      }
264

265
      List<? extends Valuable> eventTypes =
×
266
          queryParams.getEventType() != null
×
267
              ? queryParams.getEventType().stream()
×
268
                  .map(enumWrapper -> convertEventType(enumWrapper.getValue()))
×
269
                  .collect(Collectors.toList())
×
270
              : null;
271
      if (eventTypes != null) {
×
272
        fetchParamsBuilder.eventType(eventTypes);
×
273
      }
274

275
      GetEventsQueryParams fetchParams = fetchParamsBuilder.build();
×
276
      GetEventsHeaders fetchHeaders =
×
277
          new GetEventsHeaders.Builder()
278
              .extraHeaders(
×
279
                  headersInput.getExtraHeaders() != null
×
280
                      ? headersInput.getExtraHeaders()
×
281
                      : new HashMap<>())
282
              .build();
×
283

284
      Events events = eventsManager.getEvents(fetchParams, fetchHeaders);
×
285
      if (events.getEntries() != null && !events.getEntries().isEmpty()) {
×
286
        for (Event event : events.getEntries()) {
×
287
          String eventId = event.getEventId();
×
288
          if (eventId != null && !dedupHash.containsKey(eventId)) {
×
289
            dedupHash.put(eventId, true);
×
290
            if (stopped || stopLatch.getCount() == 0) {
×
291
              return;
×
292
            }
293
            eventQueue.add(event);
×
294
          }
295
        }
×
296
      }
297

298
      streamPosition =
×
299
          events.getNextStreamPosition() != null
×
300
              ? (events.getNextStreamPosition().isString()
×
301
                  ? events.getNextStreamPosition().getString()
×
302
                  : String.valueOf(events.getNextStreamPosition().getLongNumber()))
×
303
              : "now";
304

305
      if (dedupHash.size() >= DEDUPLICATION_SIZE) {
×
306
        dedupHash.clear();
×
307
        for (Event event : events.getEntries()) {
×
308
          String eventId = event.getEventId();
×
309
          if (eventId != null) {
×
310
            dedupHash.put(eventId, true);
×
311
          }
312
        }
×
313
      }
314
    } catch (Exception e) {
×
315
      if (!stopped && stopLatch.getCount() > 0) {
×
316
        if (e instanceof BoxSDKError) {
×
317
          throw (BoxSDKError) e;
×
318
        } else {
319
          throw new BoxSDKError(
×
320
              "Failed to fetch events: " + (e.getMessage() != null ? e.getMessage() : ""), e);
×
321
        }
322
      }
323
    }
×
324
  }
×
325

326
  private GetEventsQueryParamsStreamTypeField convertStreamType(String streamTypeValue) {
327
    if (streamTypeValue == null) return null;
×
328
    try {
329
      return GetEventsQueryParamsStreamTypeField.valueOf(
×
330
          streamTypeValue.toUpperCase().replace(" ", "_"));
×
331
    } catch (IllegalArgumentException e) {
×
332
      throw new BoxSDKError("Invalid stream type: " + streamTypeValue);
×
333
    }
334
  }
335

336
  private Valuable convertEventType(String eventTypeValue) {
337
    if (eventTypeValue == null) return null;
×
338
    try {
339
      return GetEventsQueryParamsEventTypeField.valueOf(
×
340
          eventTypeValue.toUpperCase().replace(" ", "_"));
×
341
    } catch (IllegalArgumentException e) {
×
342
      throw new BoxSDKError("Invalid event type: " + eventTypeValue);
×
343
    }
344
  }
345
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc