• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-android / 10077204014

24 Jul 2024 01:04PM UTC coverage: 78.922% (+2.4%) from 76.546%
10077204014

Pull #66

github

web-flow
Merge 6f0bba963 into 812f882d3
Pull Request #66: chore: several test cases in 3 commits

197 of 280 branches covered (70.36%)

Branch coverage included in aggregate %.

6 of 6 new or added lines in 2 files covered. (100.0%)

6 existing lines in 1 file now uncovered.

608 of 740 relevant lines covered (82.16%)

5.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.65
/unleashandroidsdk/src/main/java/io/getunleash/android/polling/UnleashFetcher.kt
1
package io.getunleash.android.polling
2

3
import android.util.Log
4
import com.fasterxml.jackson.module.kotlin.readValue
5
import io.getunleash.android.UnleashConfig
6
import io.getunleash.android.data.Parser
7
import io.getunleash.android.data.UnleashContext
8
import io.getunleash.android.data.UnleashState
9
import io.getunleash.android.errors.NoBodyException
10
import io.getunleash.android.errors.NotAuthorizedException
11
import io.getunleash.android.errors.ServerException
12
import io.getunleash.android.events.HeartbeatEvent
13
import io.getunleash.android.http.Throttler
14
import io.getunleash.android.unleashScope
15
import kotlinx.coroutines.Dispatchers
16
import kotlinx.coroutines.channels.BufferOverflow
17
import kotlinx.coroutines.flow.MutableSharedFlow
18
import kotlinx.coroutines.flow.SharedFlow
19
import kotlinx.coroutines.flow.StateFlow
20
import kotlinx.coroutines.flow.asSharedFlow
21
import kotlinx.coroutines.flow.distinctUntilChanged
22
import kotlinx.coroutines.launch
23
import kotlinx.coroutines.suspendCancellableCoroutine
24
import kotlinx.coroutines.withContext
25
import okhttp3.Call
26
import okhttp3.Callback
27
import okhttp3.Headers.Companion.toHeaders
28
import okhttp3.HttpUrl
29
import okhttp3.HttpUrl.Companion.toHttpUrl
30
import okhttp3.OkHttpClient
31
import okhttp3.Request
32
import okhttp3.Response
33
import okhttp3.internal.closeQuietly
34
import java.io.Closeable
35
import java.io.IOException
36
import java.util.concurrent.TimeUnit
37
import java.util.concurrent.atomic.AtomicReference
38
import kotlin.coroutines.CoroutineContext
39
import kotlin.coroutines.resume
40
import kotlin.coroutines.resumeWithException
41

42
/**
43
 * Http Client for fetching data from Unleash Proxy.
44
 * By default creates an OkHttpClient with readTimeout set to 2 seconds and a cache of 10 MBs
45
 * @param httpClient - the http client to use for fetching toggles from Unleash proxy
46
 */
47
open class UnleashFetcher(
3✔
48
    unleashConfig: UnleashConfig,
49
    private val httpClient: OkHttpClient,
3✔
50
    private val unleashContext: StateFlow<UnleashContext>,
3✔
51
) : Closeable {
52
    companion object {
53
        private const val TAG = "UnleashFetcher"
54
    }
55

56
    private val proxyUrl = unleashConfig.proxyUrl.toHttpUrl()
6✔
57
    private val applicationHeaders = unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy)
6✔
58
    private val appName = unleashConfig.appName
4✔
59
    private var etag: String? = null
60
    private val featuresReceivedFlow = MutableSharedFlow<UnleashState>(
6✔
61
        replay = 1,
1✔
62
        onBufferOverflow = BufferOverflow.DROP_OLDEST
1✔
63
    )
64
    private val fetcherHeartbeatFlow = MutableSharedFlow<HeartbeatEvent>(
6✔
65
        extraBufferCapacity = 5,
1✔
66
        onBufferOverflow = BufferOverflow.DROP_OLDEST
1✔
67
    )
68
    private val coroutineContextForContextChange: CoroutineContext = Dispatchers.IO
4✔
69
    private val currentCall = AtomicReference<Call?>(null)
6✔
70
    private val throttler =
71
        Throttler(
5✔
72
            TimeUnit.MILLISECONDS.toSeconds(unleashConfig.pollingStrategy.interval),
5✔
73
            longestAcceptableIntervalSeconds = 300,
1✔
74
            proxyUrl.toString()
3✔
75
        )
76

77
    fun getFeaturesReceivedFlow() = featuresReceivedFlow.asSharedFlow()
4✔
78

79
    fun startWatchingContext() {
80
        unleashScope.launch {
13✔
81
            unleashContext.distinctUntilChanged { old, new -> old != new }.collect {
27!
82
                withContext(coroutineContextForContextChange) {
13✔
83
                    Log.d(TAG, "Unleash context changed: $it")
12✔
84
                    refreshToggles()
8✔
85
                }
3✔
86
            }
×
87
        }
×
88
    }
1✔
89

90
    suspend fun refreshToggles(): ToggleResponse {
2✔
91
        if (throttler.performAction()) {
4✔
92
            Log.d(TAG, "Refreshing toggles")
4✔
93
            val response = refreshTogglesWithContext(unleashContext.value)
13✔
94
            fetcherHeartbeatFlow.emit(HeartbeatEvent(response.status, response.error?.message))
23✔
95
            return response
3✔
96
        }
97
        Log.i(TAG, "Skipping refresh toggles due to throttling")
4✔
98
        fetcherHeartbeatFlow.emit(HeartbeatEvent(Status.THROTTLED))
14✔
99
        return ToggleResponse(Status.THROTTLED)
10✔
100
    }
101

102
    internal suspend fun refreshTogglesWithContext(ctx: UnleashContext): ToggleResponse {
2✔
103
        val response = fetchToggles(ctx)
13✔
104
        if (response.isSuccess()) {
3✔
105

106
            val toggles = response.config!!.toggles.groupBy { it.name }
12✔
107
                .mapValues { (_, v) -> v.first() }
13✔
108
            Log.d(
2✔
109
                TAG,
1✔
110
                "Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow"
11✔
111
            )
112
            featuresReceivedFlow.emit(UnleashState(ctx, toggles))
18✔
113
            return ToggleResponse(response.status, toggles)
11✔
114
        } else {
115
            if (response.isFailed()) {
3✔
116
                if (response.error is NotAuthorizedException) {
4!
117
                    Log.e(TAG, "Not authorized to fetch toggles. Double check your SDK key")
×
118
                } else {
119
                    Log.i(TAG, "Failed to fetch toggles ${response.error?.message}", response.error)
19!
120
                }
121
            }
122
        }
123
        return ToggleResponse(response.status, error = response.error)
11✔
124
    }
125

126
    private suspend fun fetchToggles(ctx: UnleashContext): FetchResponse {
2✔
127
        val contextUrl = buildContextUrl(ctx)
4✔
128
        try {
1✔
129
            val request = Request.Builder().url(contextUrl)
6✔
130
                .headers(applicationHeaders.toHeaders())
5✔
131
            if (etag != null) {
3!
132
                request.header("If-None-Match", etag!!)
×
133
            }
134
            val call = this.httpClient.newCall(request.build())
6✔
135
            val inFlightCall = currentCall.get()
5✔
136
            if (!currentCall.compareAndSet(inFlightCall, call)) {
6!
137
                return FetchResponse(
×
138
                    Status.FAILED,
×
139
                    error = IllegalStateException("Failed to set new call while ${inFlightCall?.request()?.url} is in flight")
×
140
                )
141
            } else if (inFlightCall != null && !inFlightCall.isCanceled()) {
5!
142
                Log.d(
2✔
143
                    TAG,
1✔
144
                    "Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url}"
16✔
145
                )
146
                inFlightCall.cancel()
2✔
147
            }
148

149
            Log.d(TAG, "Fetching toggles from $contextUrl")
11✔
150
            val response = call.await()
13✔
151
            response.use { res ->
11✔
152
                Log.d(TAG, "Received status code ${res.code} from $contextUrl")
16✔
153
                throttler.handle(response.code)
5✔
154
                return when {
7✔
155
                    res.isSuccessful -> {
3✔
156
                        etag = res.header("ETag")
8✔
157
                        res.body?.use { b ->
21!
158
                            try {
2✔
159
                                val proxyResponse: ProxyResponse =
1✔
160
                                    Parser.jackson.readValue(b.string())
9✔
161
                                FetchResponse(Status.SUCCESS, proxyResponse)
10✔
162
                            } catch (e: Exception) {
×
163
                                // If we fail to parse, just keep data
164
                                FetchResponse(Status.FAILED, error = e)
1✔
165
                            }
166
                        } ?: FetchResponse(Status.FAILED, error = NoBodyException())
×
167
                    }
168

169
                    res.code == 304 -> {
4✔
170
                        FetchResponse(Status.NOT_MODIFIED)
9✔
171
                    }
172

173
                    res.code == 401 -> {
4!
174
                        FetchResponse(Status.FAILED, error = NotAuthorizedException())
×
175
                    }
176

177
                    else -> {
178
                        FetchResponse(Status.FAILED, error = ServerException(res.code))
13✔
179
                    }
180
                }
181
            }
182
        } catch (e: IOException) {
1✔
183
            return FetchResponse(status = Status.FAILED, error = e)
10✔
184
        }
185
    }
186

187
    private suspend fun Call.await(): Response {
188
        return suspendCancellableCoroutine { continuation ->
3✔
189
            enqueue(object : Callback {
10✔
190
                override fun onResponse(call: Call, response: Response) {
191
                    continuation.resume(response)
8✔
192
                }
1✔
193

194
                override fun onFailure(call: Call, e: IOException) {
195
                    // Don't bother with resuming the continuation if it is already cancelled.
196
                    if (continuation.isCancelled) return
4!
197
                    continuation.resumeWithException(e)
10✔
198
                }
1✔
199
            })
200

201
            continuation.invokeOnCancellation {
7✔
202
                try {
×
203
                    cancel()
×
204
                } catch (ex: Throwable) {
×
205
                    //Ignore cancel exception
206
                }
207
            }
×
208
        }
1✔
209
    }
210

211
    private fun buildContextUrl(ctx: UnleashContext): HttpUrl {
212
        var contextUrl = proxyUrl.newBuilder()
6✔
213
            .addQueryParameter("appName", appName)
4✔
214
        if (ctx.userId != null) {
3✔
215
            contextUrl.addQueryParameter("userId", ctx.userId)
6✔
216
        }
217
        if (ctx.remoteAddress != null) {
3!
218
            contextUrl.addQueryParameter("remoteAddress", ctx.remoteAddress)
×
219
        }
220
        if (ctx.sessionId != null) {
3!
221
            contextUrl.addQueryParameter("sessionId", ctx.sessionId)
×
222
        }
223
        ctx.properties.entries.forEach {
7✔
224
            contextUrl = contextUrl.addQueryParameter("properties[${it.key}]", it.value)
×
225
        }
×
226
        return contextUrl.build()
3✔
227
    }
228

229
    override fun close() {
230
        httpClient.dispatcher.executorService.shutdownNow()
×
231
        httpClient.connectionPool.evictAll()
×
232
        httpClient.cache?.closeQuietly()
×
233
    }
×
234

235
    fun getHeartbeatFlow(): SharedFlow<HeartbeatEvent> {
236
        return fetcherHeartbeatFlow.asSharedFlow()
4✔
237
    }
238
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc