• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-android / 11795258207

12 Nov 2024 10:21AM UTC coverage: 81.506% (-1.1%) from 82.638%
11795258207

Pull #97

github

web-flow
Merge dc7b3262f into a1093deb2
Pull Request #97: updated to use correct comparison for distinctUntilChanged

217 of 298 branches covered (72.82%)

Branch coverage included in aggregate %.

42 of 59 new or added lines in 2 files covered. (71.19%)

7 existing lines in 3 files now uncovered.

660 of 778 relevant lines covered (84.83%)

5.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.95
/unleashandroidsdk/src/main/java/io/getunleash/android/polling/UnleashFetcher.kt
1
package io.getunleash.android.polling
2

3
import android.util.Log
4
import io.getunleash.android.UnleashConfig
5
import io.getunleash.android.data.Parser.proxyResponseAdapter
6
import io.getunleash.android.data.UnleashContext
7
import io.getunleash.android.data.UnleashState
8
import io.getunleash.android.errors.NoBodyException
9
import io.getunleash.android.errors.NotAuthorizedException
10
import io.getunleash.android.errors.ServerException
11
import io.getunleash.android.events.HeartbeatEvent
12
import io.getunleash.android.http.Throttler
13
import io.getunleash.android.unleashScope
14
import java.io.Closeable
15
import java.io.IOException
16
import java.util.concurrent.TimeUnit
17
import java.util.concurrent.atomic.AtomicReference
18
import kotlin.coroutines.CoroutineContext
19
import kotlin.coroutines.resume
20
import kotlin.coroutines.resumeWithException
21
import kotlinx.coroutines.Dispatchers
22
import kotlinx.coroutines.channels.BufferOverflow
23
import kotlinx.coroutines.flow.MutableSharedFlow
24
import kotlinx.coroutines.flow.SharedFlow
25
import kotlinx.coroutines.flow.StateFlow
26
import kotlinx.coroutines.flow.asSharedFlow
27
import kotlinx.coroutines.flow.distinctUntilChanged
28
import kotlinx.coroutines.launch
29
import kotlinx.coroutines.suspendCancellableCoroutine
30
import kotlinx.coroutines.withContext
31
import okhttp3.Call
32
import okhttp3.Callback
33
import okhttp3.Headers.Companion.toHeaders
34
import okhttp3.HttpUrl
35
import okhttp3.HttpUrl.Companion.toHttpUrl
36
import okhttp3.OkHttpClient
37
import okhttp3.Request
38
import okhttp3.Response
39
import okhttp3.internal.closeQuietly
40

41
/**
42
 * Http Client for fetching data from Unleash Proxy. By default creates an OkHttpClient with
43
 * readTimeout set to 2 seconds and a cache of 10 MBs
44
 * @param httpClient
45
 * - the http client to use for fetching toggles from Unleash proxy
46
 */
47
open class UnleashFetcher(
3✔
48
        unleashConfig: UnleashConfig,
49
        private val httpClient: OkHttpClient,
3✔
50
        private val unleashContext: StateFlow<UnleashContext>,
3✔
51
) : Closeable {
52
    companion object {
53
        private const val TAG = "UnleashFetcher"
54
    }
55
    @Volatile private var contextForLastFetch: UnleashContext? = null
56
    private val proxyUrl = unleashConfig.proxyUrl?.toHttpUrl()
12✔
57
    private val applicationHeaders =
58
            unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy)
6✔
59
    private val appName = unleashConfig.appName
4✔
60
    private var etag: String? = null
61
    private val featuresReceivedFlow =
62
            MutableSharedFlow<UnleashState>(
6✔
63
                    replay = 1,
1✔
64
                    onBufferOverflow = BufferOverflow.DROP_OLDEST
1✔
65
            )
66
    private val fetcherHeartbeatFlow =
67
            MutableSharedFlow<HeartbeatEvent>(
6✔
68
                    extraBufferCapacity = 5,
1✔
69
                    onBufferOverflow = BufferOverflow.DROP_OLDEST
1✔
70
            )
71
    private val coroutineContextForContextChange: CoroutineContext = Dispatchers.IO
4✔
72
    private val currentCall = AtomicReference<Call?>(null)
6✔
73
    private val throttler =
74
            Throttler(
5✔
75
                    TimeUnit.MILLISECONDS.toSeconds(unleashConfig.pollingStrategy.interval),
5✔
76
                    longestAcceptableIntervalSeconds = 300,
1✔
77
                    proxyUrl.toString()
3✔
78
            )
79

80
    fun getFeaturesReceivedFlow() = featuresReceivedFlow.asSharedFlow()
4✔
81

82
    fun startWatchingContext() {
83
        unleashScope.launch {
13✔
84
            unleashContext.collect {
15✔
85
                if (it == contextForLastFetch) {
6✔
86
                    Log.d(TAG, "Context unchanged, skipping refresh toggles")
4✔
87
                    return@collect
2✔
88
                }
89
                withContext(coroutineContextForContextChange) {
13✔
90
                    Log.d(TAG, "Unleash context changed: $it")
12✔
91
                    refreshToggles()
8✔
92
                }
3✔
93
            }
×
94
        }
95
    }
1✔
96

97
    suspend fun refreshToggles(): ToggleResponse {
2✔
98
        if (throttler.performAction()) {
4✔
99
            Log.d(TAG, "Refreshing toggles")
4✔
100
            val response = doFetchToggles(unleashContext.value)
13✔
101
            fetcherHeartbeatFlow.emit(HeartbeatEvent(response.status, response.error?.message))
23✔
102
            return response
3✔
103
        }
104
        Log.i(TAG, "Skipping refresh toggles due to throttling")
4✔
105
        fetcherHeartbeatFlow.emit(HeartbeatEvent(Status.THROTTLED))
14✔
106
        return ToggleResponse(Status.THROTTLED)
10✔
107
    }
108

109
    suspend fun refreshTogglesWithContext(ctx: UnleashContext): ToggleResponse {
2✔
110
        if (throttler.performAction()) {
4!
111
            Log.d(TAG, "Refreshing toggles")
4✔
112
            val response = doFetchToggles(ctx)
10✔
113
            fetcherHeartbeatFlow.emit(HeartbeatEvent(response.status, response.error?.message))
21!
114
            return response
3✔
115
        }
NEW
116
        Log.i(TAG, "Skipping refresh toggles due to throttling")
×
NEW
117
        fetcherHeartbeatFlow.emit(HeartbeatEvent(Status.THROTTLED))
×
NEW
118
        return ToggleResponse(Status.THROTTLED)
×
119
    }
120

121
    internal suspend fun doFetchToggles(ctx: UnleashContext): ToggleResponse {
2✔
122
        contextForLastFetch = ctx
3✔
123
        val response = fetchToggles(ctx)
13✔
124
        if (response.isSuccess()) {
3✔
125

126
            val toggles =
1✔
127
                    response.config!!.toggles.groupBy { it.name }.mapValues { (_, v) -> v.first() }
24✔
128
            Log.d(
2✔
129
                    TAG,
1✔
130
                    "Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow"
11✔
131
            )
132
            featuresReceivedFlow.emit(UnleashState(ctx, toggles))
18✔
133
            return ToggleResponse(response.status, toggles)
11✔
134
        } else {
135
            if (response.isFailed()) {
3✔
136
                if (response.error is NotAuthorizedException) {
4!
137
                    Log.e(TAG, "Not authorized to fetch toggles. Double check your SDK key")
×
138
                } else {
139
                    Log.i(TAG, "Failed to fetch toggles ${response.error?.message}", response.error)
19!
140
                }
141
            }
142
        }
143
        return ToggleResponse(response.status, error = response.error)
11✔
144
    }
145

146
    private suspend fun fetchToggles(ctx: UnleashContext): FetchResponse {
2✔
147
        if (proxyUrl == null) {
3!
NEW
148
            return FetchResponse(
×
NEW
149
                    Status.FAILED,
×
NEW
150
                    error = IllegalStateException("Proxy URL is not set")
×
151
            )
152
        }
153
        val contextUrl = buildContextUrl(ctx)
4✔
154
        try {
1✔
155
            val request = Request.Builder().url(contextUrl).headers(applicationHeaders.toHeaders())
11✔
156
            if (etag != null) {
3!
157
                request.header("If-None-Match", etag!!)
×
158
            }
159
            val call = this.httpClient.newCall(request.build())
6✔
160
            val inFlightCall = currentCall.get()
5✔
161
            if (!currentCall.compareAndSet(inFlightCall, call)) {
6!
162
                return FetchResponse(
×
NEW
163
                        Status.FAILED,
×
164
                        error =
NEW
165
                                IllegalStateException(
×
NEW
166
                                        "Failed to set new call while ${inFlightCall?.request()?.url} is in flight"
×
167
                                )
168
                )
169
            } else if (inFlightCall != null && !inFlightCall.isCanceled() && !inFlightCall.isExecuted()) {
8!
UNCOV
170
                Log.d(
×
NEW
171
                        TAG,
×
NEW
172
                        "Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url}"
×
173
                )
UNCOV
174
                inFlightCall.cancel()
×
175
            }
176

177
            Log.d(TAG, "Fetching toggles from $contextUrl")
11✔
178
            val response = call.await()
13✔
179
            response.use { res ->
11✔
180
                Log.d(TAG, "Received status code ${res.code} from $contextUrl")
16✔
181
                throttler.handle(response.code)
5✔
182
                return when {
7✔
183
                    res.isSuccessful -> {
3✔
184
                        etag = res.header("ETag")
8✔
185
                        res.body?.use { b ->
21!
186
                            try {
2✔
187
                                val proxyResponse: ProxyResponse =
1✔
188
                                        proxyResponseAdapter.fromJson(b.string())!!
8✔
189
                                FetchResponse(Status.SUCCESS, proxyResponse)
10✔
190
                            } catch (e: Exception) {
×
191
                                // If we fail to parse, just keep data
192
                                FetchResponse(Status.FAILED, error = e)
1✔
193
                            }
194
                        }
NEW
195
                                ?: FetchResponse(Status.FAILED, error = NoBodyException())
×
196
                    }
197
                    res.code == 304 -> {
4✔
198
                        FetchResponse(Status.NOT_MODIFIED)
9✔
199
                    }
200
                    res.code == 401 -> {
4!
UNCOV
201
                        FetchResponse(Status.FAILED, error = NotAuthorizedException())
×
202
                    }
203
                    else -> {
204
                        FetchResponse(Status.FAILED, error = ServerException(res.code))
13✔
205
                    }
206
                }
207
            }
UNCOV
208
        } catch (e: IOException) {
×
UNCOV
209
            return FetchResponse(status = Status.FAILED, error = e)
×
210
        }
211
    }
212

213
    private suspend fun Call.await(): Response {
214
        return suspendCancellableCoroutine { continuation ->
3✔
215
            enqueue(
2✔
216
                    object : Callback {
8✔
217
                        override fun onResponse(call: Call, response: Response) {
218
                            continuation.resume(response)
8✔
219
                        }
1✔
220

221
                        override fun onFailure(call: Call, e: IOException) {
222
                            // Don't bother with resuming the continuation if it is already
223
                            // cancelled.
NEW
224
                            if (continuation.isCancelled) return
×
NEW
225
                            continuation.resumeWithException(e)
×
NEW
226
                        }
×
227
                    }
228
            )
229

230
            continuation.invokeOnCancellation {
7✔
231
                try {
×
232
                    cancel()
×
233
                } catch (ex: Throwable) {
×
234
                    // Ignore cancel exception
235
                }
236
            }
×
237
        }
1✔
238
    }
239

240
    private fun buildContextUrl(ctx: UnleashContext): HttpUrl {
241
        var contextUrl = proxyUrl!!.newBuilder().addQueryParameter("appName", appName)
12✔
242
        if (ctx.userId != null) {
3✔
243
            contextUrl.addQueryParameter("userId", ctx.userId)
6✔
244
        }
245
        if (ctx.remoteAddress != null) {
3!
246
            contextUrl.addQueryParameter("remoteAddress", ctx.remoteAddress)
×
247
        }
248
        if (ctx.sessionId != null) {
3!
249
            contextUrl.addQueryParameter("sessionId", ctx.sessionId)
×
250
        }
251
        ctx.properties.entries.forEach {
7✔
252
            contextUrl = contextUrl.addQueryParameter("properties[${it.key}]", it.value)
×
253
        }
×
254
        return contextUrl.build()
3✔
255
    }
256

257
    override fun close() {
258
        httpClient.dispatcher.executorService.shutdownNow()
×
259
        httpClient.connectionPool.evictAll()
×
260
        httpClient.cache?.closeQuietly()
×
261
    }
×
262

263
    fun getHeartbeatFlow(): SharedFlow<HeartbeatEvent> {
264
        return fetcherHeartbeatFlow.asSharedFlow()
4✔
265
    }
266
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc