• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-android / 11297355926

11 Oct 2024 06:19PM UTC coverage: 82.543% (-0.1%) from 82.638%
11297355926

push

github

web-flow
docs: use defaults for polling and metrics intervals (#92)

chore: use defaults for polling and metrics intervals

214 of 292 branches covered (73.29%)

Branch coverage included in aggregate %.

656 of 762 relevant lines covered (86.09%)

5.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.3
/unleashandroidsdk/src/main/java/io/getunleash/android/polling/UnleashFetcher.kt
1
package io.getunleash.android.polling
2

3
import android.util.Log
4
import io.getunleash.android.UnleashConfig
5
import io.getunleash.android.data.Parser.proxyResponseAdapter
6
import io.getunleash.android.data.UnleashContext
7
import io.getunleash.android.data.UnleashState
8
import io.getunleash.android.errors.NoBodyException
9
import io.getunleash.android.errors.NotAuthorizedException
10
import io.getunleash.android.errors.ServerException
11
import io.getunleash.android.events.HeartbeatEvent
12
import io.getunleash.android.http.Throttler
13
import io.getunleash.android.unleashScope
14
import kotlinx.coroutines.Dispatchers
15
import kotlinx.coroutines.channels.BufferOverflow
16
import kotlinx.coroutines.flow.MutableSharedFlow
17
import kotlinx.coroutines.flow.SharedFlow
18
import kotlinx.coroutines.flow.StateFlow
19
import kotlinx.coroutines.flow.asSharedFlow
20
import kotlinx.coroutines.flow.distinctUntilChanged
21
import kotlinx.coroutines.launch
22
import kotlinx.coroutines.suspendCancellableCoroutine
23
import kotlinx.coroutines.withContext
24
import okhttp3.Call
25
import okhttp3.Callback
26
import okhttp3.Headers.Companion.toHeaders
27
import okhttp3.HttpUrl
28
import okhttp3.HttpUrl.Companion.toHttpUrl
29
import okhttp3.OkHttpClient
30
import okhttp3.Request
31
import okhttp3.Response
32
import okhttp3.internal.closeQuietly
33
import java.io.Closeable
34
import java.io.IOException
35
import java.util.concurrent.TimeUnit
36
import java.util.concurrent.atomic.AtomicReference
37
import kotlin.coroutines.CoroutineContext
38
import kotlin.coroutines.resume
39
import kotlin.coroutines.resumeWithException
40

41
/**
42
 * Http Client for fetching data from Unleash Proxy.
43
 * By default creates an OkHttpClient with readTimeout set to 2 seconds and a cache of 10 MBs
44
 * @param httpClient - the http client to use for fetching toggles from Unleash proxy
45
 */
46
open class UnleashFetcher(
3✔
47
    unleashConfig: UnleashConfig,
48
    private val httpClient: OkHttpClient,
3✔
49
    private val unleashContext: StateFlow<UnleashContext>,
3✔
50
) : Closeable {
51
    companion object {
52
        private const val TAG = "UnleashFetcher"
53
    }
54

55
    private val proxyUrl = unleashConfig.proxyUrl?.toHttpUrl()
12✔
56
    private val applicationHeaders = unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy)
6✔
57
    private val appName = unleashConfig.appName
4✔
58
    private var etag: String? = null
59
    private val featuresReceivedFlow = MutableSharedFlow<UnleashState>(
6✔
60
        replay = 1,
1✔
61
        onBufferOverflow = BufferOverflow.DROP_OLDEST
1✔
62
    )
63
    private val fetcherHeartbeatFlow = MutableSharedFlow<HeartbeatEvent>(
6✔
64
        extraBufferCapacity = 5,
1✔
65
        onBufferOverflow = BufferOverflow.DROP_OLDEST
1✔
66
    )
67
    private val coroutineContextForContextChange: CoroutineContext = Dispatchers.IO
4✔
68
    private val currentCall = AtomicReference<Call?>(null)
6✔
69
    private val throttler =
70
        Throttler(
5✔
71
            TimeUnit.MILLISECONDS.toSeconds(unleashConfig.pollingStrategy.interval),
5✔
72
            longestAcceptableIntervalSeconds = 300,
1✔
73
            proxyUrl.toString()
3✔
74
        )
75

76
    fun getFeaturesReceivedFlow() = featuresReceivedFlow.asSharedFlow()
4✔
77

78
    fun startWatchingContext() {
79
        unleashScope.launch {
13✔
80
            unleashContext.distinctUntilChanged { old, new -> old != new }.collect {
27!
81
                withContext(coroutineContextForContextChange) {
13✔
82
                    Log.d(TAG, "Unleash context changed: $it")
12✔
83
                    refreshToggles()
8✔
84
                }
3✔
85
            }
×
86
        }
×
87
    }
1✔
88

89
    suspend fun refreshToggles(): ToggleResponse {
2✔
90
        if (throttler.performAction()) {
4✔
91
            Log.d(TAG, "Refreshing toggles")
4✔
92
            val response = refreshTogglesWithContext(unleashContext.value)
13✔
93
            fetcherHeartbeatFlow.emit(HeartbeatEvent(response.status, response.error?.message))
23✔
94
            return response
3✔
95
        }
96
        Log.i(TAG, "Skipping refresh toggles due to throttling")
4✔
97
        fetcherHeartbeatFlow.emit(HeartbeatEvent(Status.THROTTLED))
14✔
98
        return ToggleResponse(Status.THROTTLED)
10✔
99
    }
100

101
    internal suspend fun refreshTogglesWithContext(ctx: UnleashContext): ToggleResponse {
2✔
102
        val response = fetchToggles(ctx)
13✔
103
        if (response.isSuccess()) {
3✔
104

105
            val toggles = response.config!!.toggles.groupBy { it.name }
12✔
106
                .mapValues { (_, v) -> v.first() }
13✔
107
            Log.d(
2✔
108
                TAG,
1✔
109
                "Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow"
11✔
110
            )
111
            featuresReceivedFlow.emit(UnleashState(ctx, toggles))
18✔
112
            return ToggleResponse(response.status, toggles)
11✔
113
        } else {
114
            if (response.isFailed()) {
3✔
115
                if (response.error is NotAuthorizedException) {
4!
116
                    Log.e(TAG, "Not authorized to fetch toggles. Double check your SDK key")
×
117
                } else {
118
                    Log.i(TAG, "Failed to fetch toggles ${response.error?.message}", response.error)
19!
119
                }
120
            }
121
        }
122
        return ToggleResponse(response.status, error = response.error)
11✔
123
    }
124

125
    private suspend fun fetchToggles(ctx: UnleashContext): FetchResponse {
2✔
126
        if (proxyUrl == null) {
3!
127
            return FetchResponse(Status.FAILED, error = IllegalStateException("Proxy URL is not set"))
×
128
        }
129
        val contextUrl = buildContextUrl(ctx)
4✔
130
        try {
1✔
131
            val request = Request.Builder().url(contextUrl)
6✔
132
                .headers(applicationHeaders.toHeaders())
5✔
133
            if (etag != null) {
3!
134
                request.header("If-None-Match", etag!!)
×
135
            }
136
            val call = this.httpClient.newCall(request.build())
6✔
137
            val inFlightCall = currentCall.get()
5✔
138
            if (!currentCall.compareAndSet(inFlightCall, call)) {
6!
139
                return FetchResponse(
×
140
                    Status.FAILED,
×
141
                    error = IllegalStateException("Failed to set new call while ${inFlightCall?.request()?.url} is in flight")
×
142
                )
143
            } else if (inFlightCall != null && !inFlightCall.isCanceled()) {
5!
144
                Log.d(
2✔
145
                    TAG,
1✔
146
                    "Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url}"
16✔
147
                )
148
                inFlightCall.cancel()
2✔
149
            }
150

151
            Log.d(TAG, "Fetching toggles from $contextUrl")
11✔
152
            val response = call.await()
13✔
153
            response.use { res ->
11✔
154
                Log.d(TAG, "Received status code ${res.code} from $contextUrl")
16✔
155
                throttler.handle(response.code)
5✔
156
                return when {
7✔
157
                    res.isSuccessful -> {
3✔
158
                        etag = res.header("ETag")
8✔
159
                        res.body?.use { b ->
21!
160
                            try {
2✔
161
                                val proxyResponse: ProxyResponse =
1✔
162
                                    proxyResponseAdapter.fromJson(b.string())!!
8✔
163
                                FetchResponse(Status.SUCCESS, proxyResponse)
10✔
164
                            } catch (e: Exception) {
×
165
                                // If we fail to parse, just keep data
166
                                FetchResponse(Status.FAILED, error = e)
1✔
167
                            }
168
                        } ?: FetchResponse(Status.FAILED, error = NoBodyException())
×
169
                    }
170

171
                    res.code == 304 -> {
4✔
172
                        FetchResponse(Status.NOT_MODIFIED)
9✔
173
                    }
174

175
                    res.code == 401 -> {
4!
176
                        FetchResponse(Status.FAILED, error = NotAuthorizedException())
×
177
                    }
178

179
                    else -> {
180
                        FetchResponse(Status.FAILED, error = ServerException(res.code))
13✔
181
                    }
182
                }
183
            }
184
        } catch (e: IOException) {
1✔
185
            return FetchResponse(status = Status.FAILED, error = e)
10✔
186
        }
187
    }
188

189
    private suspend fun Call.await(): Response {
190
        return suspendCancellableCoroutine { continuation ->
3✔
191
            enqueue(object : Callback {
10✔
192
                override fun onResponse(call: Call, response: Response) {
193
                    continuation.resume(response)
8✔
194
                }
1✔
195

196
                override fun onFailure(call: Call, e: IOException) {
197
                    // Don't bother with resuming the continuation if it is already cancelled.
198
                    if (continuation.isCancelled) return
4!
199
                    continuation.resumeWithException(e)
10✔
200
                }
1✔
201
            })
202

203
            continuation.invokeOnCancellation {
7✔
204
                try {
×
205
                    cancel()
×
206
                } catch (ex: Throwable) {
×
207
                    //Ignore cancel exception
208
                }
209
            }
×
210
        }
1✔
211
    }
212

213
    private fun buildContextUrl(ctx: UnleashContext): HttpUrl {
214
        var contextUrl = proxyUrl!!.newBuilder()
8✔
215
            .addQueryParameter("appName", appName)
4✔
216
        if (ctx.userId != null) {
3✔
217
            contextUrl.addQueryParameter("userId", ctx.userId)
6✔
218
        }
219
        if (ctx.remoteAddress != null) {
3!
220
            contextUrl.addQueryParameter("remoteAddress", ctx.remoteAddress)
×
221
        }
222
        if (ctx.sessionId != null) {
3!
223
            contextUrl.addQueryParameter("sessionId", ctx.sessionId)
×
224
        }
225
        ctx.properties.entries.forEach {
7✔
226
            contextUrl = contextUrl.addQueryParameter("properties[${it.key}]", it.value)
×
227
        }
×
228
        return contextUrl.build()
3✔
229
    }
230

231
    override fun close() {
232
        httpClient.dispatcher.executorService.shutdownNow()
×
233
        httpClient.connectionPool.evictAll()
×
234
        httpClient.cache?.closeQuietly()
×
235
    }
×
236

237
    fun getHeartbeatFlow(): SharedFlow<HeartbeatEvent> {
238
        return fetcherHeartbeatFlow.asSharedFlow()
4✔
239
    }
240
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc