• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-android / 10096795707

25 Jul 2024 03:15PM CUT coverage: 82.319%. Remained the same
10096795707

Pull #73

github

web-flow
Merge 6148df0d2 into 1977dc1e9
Pull Request #73: chore(deps): update dependency org.jetbrains.kotlin.android to v1.9.25

209 of 284 branches covered (73.59%)

Branch coverage included in aggregate %.

643 of 751 relevant lines covered (85.62%)

5.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.65
/unleashandroidsdk/src/main/java/io/getunleash/android/polling/UnleashFetcher.kt
1
package io.getunleash.android.polling
2

3
import android.util.Log
4
import io.getunleash.android.UnleashConfig
5
import io.getunleash.android.data.Parser.proxyResponseAdapter
6
import io.getunleash.android.data.UnleashContext
7
import io.getunleash.android.data.UnleashState
8
import io.getunleash.android.errors.NoBodyException
9
import io.getunleash.android.errors.NotAuthorizedException
10
import io.getunleash.android.errors.ServerException
11
import io.getunleash.android.events.HeartbeatEvent
12
import io.getunleash.android.http.Throttler
13
import io.getunleash.android.unleashScope
14
import kotlinx.coroutines.Dispatchers
15
import kotlinx.coroutines.channels.BufferOverflow
16
import kotlinx.coroutines.flow.MutableSharedFlow
17
import kotlinx.coroutines.flow.SharedFlow
18
import kotlinx.coroutines.flow.StateFlow
19
import kotlinx.coroutines.flow.asSharedFlow
20
import kotlinx.coroutines.flow.distinctUntilChanged
21
import kotlinx.coroutines.launch
22
import kotlinx.coroutines.suspendCancellableCoroutine
23
import kotlinx.coroutines.withContext
24
import okhttp3.Call
25
import okhttp3.Callback
26
import okhttp3.Headers.Companion.toHeaders
27
import okhttp3.HttpUrl
28
import okhttp3.HttpUrl.Companion.toHttpUrl
29
import okhttp3.OkHttpClient
30
import okhttp3.Request
31
import okhttp3.Response
32
import okhttp3.internal.closeQuietly
33
import java.io.Closeable
34
import java.io.IOException
35
import java.util.concurrent.TimeUnit
36
import java.util.concurrent.atomic.AtomicReference
37
import kotlin.coroutines.CoroutineContext
38
import kotlin.coroutines.resume
39
import kotlin.coroutines.resumeWithException
40

41
/**
42
 * Http Client for fetching data from Unleash Proxy.
43
 * By default creates an OkHttpClient with readTimeout set to 2 seconds and a cache of 10 MBs
44
 * @param httpClient - the http client to use for fetching toggles from Unleash proxy
45
 */
46
open class UnleashFetcher(
3✔
47
    unleashConfig: UnleashConfig,
48
    private val httpClient: OkHttpClient,
3✔
49
    private val unleashContext: StateFlow<UnleashContext>,
3✔
50
) : Closeable {
51
    companion object {
52
        private const val TAG = "UnleashFetcher"
53
    }
54

55
    private val proxyUrl = unleashConfig.proxyUrl.toHttpUrl()
6✔
56
    private val applicationHeaders = unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy)
6✔
57
    private val appName = unleashConfig.appName
4✔
58
    private var etag: String? = null
59
    private val featuresReceivedFlow = MutableSharedFlow<UnleashState>(
6✔
60
        replay = 1,
1✔
61
        onBufferOverflow = BufferOverflow.DROP_OLDEST
1✔
62
    )
63
    private val fetcherHeartbeatFlow = MutableSharedFlow<HeartbeatEvent>(
6✔
64
        extraBufferCapacity = 5,
1✔
65
        onBufferOverflow = BufferOverflow.DROP_OLDEST
1✔
66
    )
67
    private val coroutineContextForContextChange: CoroutineContext = Dispatchers.IO
4✔
68
    private val currentCall = AtomicReference<Call?>(null)
6✔
69
    private val throttler =
70
        Throttler(
5✔
71
            TimeUnit.MILLISECONDS.toSeconds(unleashConfig.pollingStrategy.interval),
5✔
72
            longestAcceptableIntervalSeconds = 300,
1✔
73
            proxyUrl.toString()
3✔
74
        )
75

76
    fun getFeaturesReceivedFlow() = featuresReceivedFlow.asSharedFlow()
4✔
77

78
    fun startWatchingContext() {
79
        unleashScope.launch {
13✔
80
            unleashContext.distinctUntilChanged { old, new -> old != new }.collect {
27!
81
                withContext(coroutineContextForContextChange) {
13✔
82
                    Log.d(TAG, "Unleash context changed: $it")
12✔
83
                    refreshToggles()
8✔
84
                }
3✔
85
            }
×
86
        }
×
87
    }
1✔
88

89
    suspend fun refreshToggles(): ToggleResponse {
2✔
90
        if (throttler.performAction()) {
4✔
91
            Log.d(TAG, "Refreshing toggles")
4✔
92
            val response = refreshTogglesWithContext(unleashContext.value)
13✔
93
            fetcherHeartbeatFlow.emit(HeartbeatEvent(response.status, response.error?.message))
23✔
94
            return response
3✔
95
        }
96
        Log.i(TAG, "Skipping refresh toggles due to throttling")
4✔
97
        fetcherHeartbeatFlow.emit(HeartbeatEvent(Status.THROTTLED))
14✔
98
        return ToggleResponse(Status.THROTTLED)
10✔
99
    }
100

101
    internal suspend fun refreshTogglesWithContext(ctx: UnleashContext): ToggleResponse {
2✔
102
        val response = fetchToggles(ctx)
13✔
103
        if (response.isSuccess()) {
3✔
104

105
            val toggles = response.config!!.toggles.groupBy { it.name }
12✔
106
                .mapValues { (_, v) -> v.first() }
13✔
107
            Log.d(
2✔
108
                TAG,
1✔
109
                "Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow"
11✔
110
            )
111
            featuresReceivedFlow.emit(UnleashState(ctx, toggles))
18✔
112
            return ToggleResponse(response.status, toggles)
11✔
113
        } else {
114
            if (response.isFailed()) {
3✔
115
                if (response.error is NotAuthorizedException) {
4!
116
                    Log.e(TAG, "Not authorized to fetch toggles. Double check your SDK key")
×
117
                } else {
118
                    Log.i(TAG, "Failed to fetch toggles ${response.error?.message}", response.error)
19!
119
                }
120
            }
121
        }
122
        return ToggleResponse(response.status, error = response.error)
11✔
123
    }
124

125
    private suspend fun fetchToggles(ctx: UnleashContext): FetchResponse {
2✔
126
        val contextUrl = buildContextUrl(ctx)
4✔
127
        try {
1✔
128
            val request = Request.Builder().url(contextUrl)
6✔
129
                .headers(applicationHeaders.toHeaders())
5✔
130
            if (etag != null) {
3!
131
                request.header("If-None-Match", etag!!)
×
132
            }
133
            val call = this.httpClient.newCall(request.build())
6✔
134
            val inFlightCall = currentCall.get()
5✔
135
            if (!currentCall.compareAndSet(inFlightCall, call)) {
6!
136
                return FetchResponse(
×
137
                    Status.FAILED,
×
138
                    error = IllegalStateException("Failed to set new call while ${inFlightCall?.request()?.url} is in flight")
×
139
                )
140
            } else if (inFlightCall != null && !inFlightCall.isCanceled()) {
5!
141
                Log.d(
2✔
142
                    TAG,
1✔
143
                    "Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url}"
16✔
144
                )
145
                inFlightCall.cancel()
2✔
146
            }
147

148
            Log.d(TAG, "Fetching toggles from $contextUrl")
11✔
149
            val response = call.await()
13✔
150
            response.use { res ->
11✔
151
                Log.d(TAG, "Received status code ${res.code} from $contextUrl")
16✔
152
                throttler.handle(response.code)
5✔
153
                return when {
7✔
154
                    res.isSuccessful -> {
3✔
155
                        etag = res.header("ETag")
8✔
156
                        res.body?.use { b ->
21!
157
                            try {
2✔
158
                                val proxyResponse: ProxyResponse =
1✔
159
                                    proxyResponseAdapter.fromJson(b.string())!!
8✔
160
                                FetchResponse(Status.SUCCESS, proxyResponse)
10✔
161
                            } catch (e: Exception) {
×
162
                                // If we fail to parse, just keep data
163
                                FetchResponse(Status.FAILED, error = e)
1✔
164
                            }
165
                        } ?: FetchResponse(Status.FAILED, error = NoBodyException())
×
166
                    }
167

168
                    res.code == 304 -> {
4✔
169
                        FetchResponse(Status.NOT_MODIFIED)
9✔
170
                    }
171

172
                    res.code == 401 -> {
4!
173
                        FetchResponse(Status.FAILED, error = NotAuthorizedException())
×
174
                    }
175

176
                    else -> {
177
                        FetchResponse(Status.FAILED, error = ServerException(res.code))
13✔
178
                    }
179
                }
180
            }
181
        } catch (e: IOException) {
1✔
182
            return FetchResponse(status = Status.FAILED, error = e)
10✔
183
        }
184
    }
185

186
    private suspend fun Call.await(): Response {
187
        return suspendCancellableCoroutine { continuation ->
3✔
188
            enqueue(object : Callback {
10✔
189
                override fun onResponse(call: Call, response: Response) {
190
                    continuation.resume(response)
8✔
191
                }
1✔
192

193
                override fun onFailure(call: Call, e: IOException) {
194
                    // Don't bother with resuming the continuation if it is already cancelled.
195
                    if (continuation.isCancelled) return
4!
196
                    continuation.resumeWithException(e)
10✔
197
                }
1✔
198
            })
199

200
            continuation.invokeOnCancellation {
7✔
201
                try {
×
202
                    cancel()
×
203
                } catch (ex: Throwable) {
×
204
                    //Ignore cancel exception
205
                }
206
            }
×
207
        }
1✔
208
    }
209

210
    private fun buildContextUrl(ctx: UnleashContext): HttpUrl {
211
        var contextUrl = proxyUrl.newBuilder()
6✔
212
            .addQueryParameter("appName", appName)
4✔
213
        if (ctx.userId != null) {
3✔
214
            contextUrl.addQueryParameter("userId", ctx.userId)
6✔
215
        }
216
        if (ctx.remoteAddress != null) {
3!
217
            contextUrl.addQueryParameter("remoteAddress", ctx.remoteAddress)
×
218
        }
219
        if (ctx.sessionId != null) {
3!
220
            contextUrl.addQueryParameter("sessionId", ctx.sessionId)
×
221
        }
222
        ctx.properties.entries.forEach {
7✔
223
            contextUrl = contextUrl.addQueryParameter("properties[${it.key}]", it.value)
×
224
        }
×
225
        return contextUrl.build()
3✔
226
    }
227

228
    override fun close() {
229
        httpClient.dispatcher.executorService.shutdownNow()
×
230
        httpClient.connectionPool.evictAll()
×
231
        httpClient.cache?.closeQuietly()
×
232
    }
×
233

234
    fun getHeartbeatFlow(): SharedFlow<HeartbeatEvent> {
235
        return fetcherHeartbeatFlow.asSharedFlow()
4✔
236
    }
237
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc