Introduce Flow.asLiveData() terminal operator.
It inherits semantics and behavior from livedata {} builder.
Test: FlowLiveDataTest
Change-Id: I7c46bed89a81cd0c8550a5728061dae9fed2fb95
diff --git a/buildSrc/src/main/kotlin/androidx/build/dependencies/Dependencies.kt b/buildSrc/src/main/kotlin/androidx/build/dependencies/Dependencies.kt
index 2679c13..dced908 100644
--- a/buildSrc/src/main/kotlin/androidx/build/dependencies/Dependencies.kt
+++ b/buildSrc/src/main/kotlin/androidx/build/dependencies/Dependencies.kt
@@ -71,10 +71,14 @@
"org.jetbrains.kotlinx:kotlinx-coroutines-guava:$KOTLIN_COROUTINES_VERSION"
const val KOTLIN_COROUTINES_TEST =
"org.jetbrains.kotlinx:kotlinx-coroutines-test:$KOTLIN_COROUTINES_VERSION"
+
+private const val KOTLIN_COROUTINES_PREVIEW_VERSION = "1.3.0-RC"
const val KOTLIN_COROUTINES_PREVIEW =
- "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-RC"
+ "org.jetbrains.kotlinx:kotlinx-coroutines-android:$KOTLIN_COROUTINES_PREVIEW_VERSION"
+const val KOTLIN_COROUTINES_CORE_PREVIEW =
+ "org.jetbrains.kotlinx:kotlinx-coroutines-core:$KOTLIN_COROUTINES_PREVIEW_VERSION"
const val KOTLIN_COROUTINES_TEST_PREVIEW =
- "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.0-RC"
+ "org.jetbrains.kotlinx:kotlinx-coroutines-test:$KOTLIN_COROUTINES_PREVIEW_VERSION"
const val LEAKCANARY_INSTRUMENTATION =
"com.squareup.leakcanary:leakcanary-android-instrumentation:1.6.2"
diff --git a/lifecycle/lifecycle-livedata-ktx/api/2.2.0-alpha03.txt b/lifecycle/lifecycle-livedata-ktx/api/2.2.0-alpha03.txt
index 9dc2bee..5aec16c 100644
--- a/lifecycle/lifecycle-livedata-ktx/api/2.2.0-alpha03.txt
+++ b/lifecycle/lifecycle-livedata-ktx/api/2.2.0-alpha03.txt
@@ -11,6 +11,16 @@
method public static <T> androidx.lifecycle.LiveData<T> liveData(kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L, kotlin.jvm.functions.Function2<? super androidx.lifecycle.LiveDataScope<T>,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?> block);
}
+ public final class FlowLiveDataApi26Kt {
+ ctor public FlowLiveDataApi26Kt();
+ method @RequiresApi(android.os.Build.VERSION_CODES.O) public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, java.time.Duration timeout);
+ }
+
+ public final class FlowLiveDataKt {
+ ctor public FlowLiveDataKt();
+ method public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L);
+ }
+
public interface LiveDataScope<T> {
method public suspend Object! emit(T? value, kotlin.coroutines.Continuation<? super kotlin.Unit> p);
method public suspend Object emitSource(androidx.lifecycle.LiveData<T> source, kotlin.coroutines.Continuation<? super kotlinx.coroutines.DisposableHandle> p);
diff --git a/lifecycle/lifecycle-livedata-ktx/api/current.txt b/lifecycle/lifecycle-livedata-ktx/api/current.txt
index 9dc2bee..5aec16c 100644
--- a/lifecycle/lifecycle-livedata-ktx/api/current.txt
+++ b/lifecycle/lifecycle-livedata-ktx/api/current.txt
@@ -11,6 +11,16 @@
method public static <T> androidx.lifecycle.LiveData<T> liveData(kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L, kotlin.jvm.functions.Function2<? super androidx.lifecycle.LiveDataScope<T>,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?> block);
}
+ public final class FlowLiveDataApi26Kt {
+ ctor public FlowLiveDataApi26Kt();
+ method @RequiresApi(android.os.Build.VERSION_CODES.O) public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, java.time.Duration timeout);
+ }
+
+ public final class FlowLiveDataKt {
+ ctor public FlowLiveDataKt();
+ method public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L);
+ }
+
public interface LiveDataScope<T> {
method public suspend Object! emit(T? value, kotlin.coroutines.Continuation<? super kotlin.Unit> p);
method public suspend Object emitSource(androidx.lifecycle.LiveData<T> source, kotlin.coroutines.Continuation<? super kotlinx.coroutines.DisposableHandle> p);
diff --git a/lifecycle/lifecycle-livedata-ktx/api/restricted_2.2.0-alpha03.txt b/lifecycle/lifecycle-livedata-ktx/api/restricted_2.2.0-alpha03.txt
index 9dc2bee..5aec16c 100644
--- a/lifecycle/lifecycle-livedata-ktx/api/restricted_2.2.0-alpha03.txt
+++ b/lifecycle/lifecycle-livedata-ktx/api/restricted_2.2.0-alpha03.txt
@@ -11,6 +11,16 @@
method public static <T> androidx.lifecycle.LiveData<T> liveData(kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L, kotlin.jvm.functions.Function2<? super androidx.lifecycle.LiveDataScope<T>,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?> block);
}
+ public final class FlowLiveDataApi26Kt {
+ ctor public FlowLiveDataApi26Kt();
+ method @RequiresApi(android.os.Build.VERSION_CODES.O) public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, java.time.Duration timeout);
+ }
+
+ public final class FlowLiveDataKt {
+ ctor public FlowLiveDataKt();
+ method public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L);
+ }
+
public interface LiveDataScope<T> {
method public suspend Object! emit(T? value, kotlin.coroutines.Continuation<? super kotlin.Unit> p);
method public suspend Object emitSource(androidx.lifecycle.LiveData<T> source, kotlin.coroutines.Continuation<? super kotlinx.coroutines.DisposableHandle> p);
diff --git a/lifecycle/lifecycle-livedata-ktx/api/restricted_current.txt b/lifecycle/lifecycle-livedata-ktx/api/restricted_current.txt
index 9dc2bee..5aec16c 100644
--- a/lifecycle/lifecycle-livedata-ktx/api/restricted_current.txt
+++ b/lifecycle/lifecycle-livedata-ktx/api/restricted_current.txt
@@ -11,6 +11,16 @@
method public static <T> androidx.lifecycle.LiveData<T> liveData(kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L, kotlin.jvm.functions.Function2<? super androidx.lifecycle.LiveDataScope<T>,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?> block);
}
+ public final class FlowLiveDataApi26Kt {
+ ctor public FlowLiveDataApi26Kt();
+ method @RequiresApi(android.os.Build.VERSION_CODES.O) public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, java.time.Duration timeout);
+ }
+
+ public final class FlowLiveDataKt {
+ ctor public FlowLiveDataKt();
+ method public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L);
+ }
+
public interface LiveDataScope<T> {
method public suspend Object! emit(T? value, kotlin.coroutines.Continuation<? super kotlin.Unit> p);
method public suspend Object emitSource(androidx.lifecycle.LiveData<T> source, kotlin.coroutines.Continuation<? super kotlinx.coroutines.DisposableHandle> p);
diff --git a/lifecycle/lifecycle-livedata-ktx/build.gradle b/lifecycle/lifecycle-livedata-ktx/build.gradle
index 0c72bc8..cf0517c 100644
--- a/lifecycle/lifecycle-livedata-ktx/build.gradle
+++ b/lifecycle/lifecycle-livedata-ktx/build.gradle
@@ -41,7 +41,7 @@
api(project(":lifecycle:lifecycle-livedata"))
api(project(":lifecycle:lifecycle-livedata-core-ktx"))
api(KOTLIN_STDLIB)
- api(KOTLIN_COROUTINES_CORE)
+ api(KOTLIN_COROUTINES_CORE_PREVIEW)
testImplementation(project(":lifecycle:lifecycle-runtime"))
testImplementation("androidx.arch.core:core-testing:2.1.0-rc01")
testImplementation(JUNIT)
diff --git a/lifecycle/lifecycle-livedata-ktx/src/main/java/androidx/lifecycle/FlowLiveData.kt b/lifecycle/lifecycle-livedata-ktx/src/main/java/androidx/lifecycle/FlowLiveData.kt
new file mode 100644
index 0000000..d14c561
--- /dev/null
+++ b/lifecycle/lifecycle-livedata-ktx/src/main/java/androidx/lifecycle/FlowLiveData.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.lifecycle
+
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.collect
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.EmptyCoroutineContext
+
+/**
+ * Creates a LiveData that has values collected from the origin [Flow].
+ *
+ * The upstream flow collection starts when the returned [LiveData] becomes active
+ * ([LiveData.onActive]).
+ * If the [LiveData] becomes inactive ([LiveData.onInactive]) while the flow has not completed,
+ * the flow collection will be cancelled after [timeoutInMs] milliseconds unless the [LiveData]
+ * becomes active again before that timeout (to gracefully handle cases like Activity rotation).
+ *
+ * After a cancellation, if the [LiveData] becomes active again, the upstream flow collection will
+ * be re-executed.
+ *
+ * If the upstream flow completes successfully *or* is cancelled due to reasons other than
+ * [LiveData] becoming inactive, it *will not* be re-collected even after [LiveData] goes through
+ * active inactive cycle.
+ *
+ * If flow completes with an exception, then exception will be delivered to the
+ * [CoroutineExceptionHandler][kotlinx.coroutines.CoroutineExceptionHandler] of provided [context].
+ * By default [EmptyCoroutineContext] is used to so an exception will be delivered to main's
+ * thread [UncaughtExceptionHandler][Thread.UncaughtExceptionHandler]. If your flow upstream is
+ * expected to throw, you can use [catch operator][kotlinx.coroutines.flow.catch] on upstream flow
+ * to emit a helpful error object.
+ *
+ * @param context The CoroutineContext to collect the upstream flow in. Defaults to
+ * [EmptyCoroutineContext] combined with [Dispatchers.Main]
+ * @param timeoutInMs The timeout in ms before cancelling the block if there are no active observers
+ * ([LiveData.hasActiveObservers]. Defaults to [DEFAULT_TIMEOUT].
+ */
+fun <T> Flow<T>.asLiveData(
+ context: CoroutineContext = EmptyCoroutineContext,
+ timeoutInMs: Long = DEFAULT_TIMEOUT
+): LiveData<T> = liveData(context, timeoutInMs) {
+ collect {
+ emit(it)
+ }
+}
\ No newline at end of file
diff --git a/lifecycle/lifecycle-livedata-ktx/src/main/java/androidx/lifecycle/FlowLiveDataApi26.kt b/lifecycle/lifecycle-livedata-ktx/src/main/java/androidx/lifecycle/FlowLiveDataApi26.kt
new file mode 100644
index 0000000..ac2643d
--- /dev/null
+++ b/lifecycle/lifecycle-livedata-ktx/src/main/java/androidx/lifecycle/FlowLiveDataApi26.kt
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.lifecycle
+
+import android.os.Build
+import androidx.annotation.RequiresApi
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.flow.Flow
+import java.time.Duration
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.EmptyCoroutineContext
+
+/**
+ * Creates a LiveData that has values collected from the origin [Flow].
+ *
+ * The upstream flow collection starts when the returned [LiveData] becomes active
+ * ([LiveData.onActive]).
+ * If the [LiveData] becomes inactive ([LiveData.onInactive]) while the flow has not completed,
+ * the flow collection will be cancelled after [timeout] unless the [LiveData]
+ * becomes active again before that timeout (to gracefully handle cases like Activity rotation).
+ *
+ * After a cancellation, if the [LiveData] becomes active again, the upstream flow collection will
+ * be re-executed.
+ *
+ * If the upstream flow completes successfully *or* is cancelled due to reasons other than
+ * [LiveData] becoming inactive, it *will not* be re-collected even after [LiveData] goes through
+ * active inactive cycle.
+ *
+ * If flow completes with an exception, then exception will be delivered to the
+ * [CoroutineExceptionHandler][kotlinx.coroutines.CoroutineExceptionHandler] of provided [context].
+ * By default [EmptyCoroutineContext] is used to so an exception will be delivered to main's
+ * thread [UncaughtExceptionHandler][Thread.UncaughtExceptionHandler]. If your flow upstream is
+ * expected to throw, you can use [catch operator][kotlinx.coroutines.flow.catch] on upstream flow
+ * to emit a helpful error object.
+ *
+ * @param context The CoroutineContext to collect the upstream flow in. Defaults to
+ * [EmptyCoroutineContext] combined with [Dispatchers.Main]
+ * @param timeout The timeout in ms before cancelling the block if there are no active observers
+ * ([LiveData.hasActiveObservers]. Defaults to [DEFAULT_TIMEOUT].
+ */
+@RequiresApi(Build.VERSION_CODES.O)
+fun <T> Flow<T>.asLiveData(
+ context: CoroutineContext = EmptyCoroutineContext,
+ timeout: Duration
+): LiveData<T> = asLiveData(context, timeout.toMillis())
\ No newline at end of file
diff --git a/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt b/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt
index d0e6b8c..9e55611 100644
--- a/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt
+++ b/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt
@@ -16,26 +16,16 @@
package androidx.lifecycle
-import androidx.arch.core.executor.ArchTaskExecutor
-import androidx.arch.core.executor.TaskExecutor
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineExceptionHandler
-import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
-import kotlinx.coroutines.test.TestCoroutineDispatcher
-import kotlinx.coroutines.test.TestCoroutineScope
-import kotlinx.coroutines.test.resetMain
-import kotlinx.coroutines.test.setMain
-import org.junit.After
-import org.junit.Before
+import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
@@ -47,53 +37,17 @@
@ExperimentalCoroutinesApi
@RunWith(JUnit4::class)
class BuildLiveDataTest {
- private val mainDispatcher = TestCoroutineDispatcher()
- private val mainScope = TestCoroutineScope(mainDispatcher)
-
- private val testDispatcher = TestCoroutineDispatcher()
- private val testScope = TestCoroutineScope(testDispatcher)
-
- @Before
- fun initMain() {
- Dispatchers.setMain(mainDispatcher)
- ArchTaskExecutor.getInstance().setDelegate(
- object : TaskExecutor() {
- override fun executeOnDiskIO(runnable: Runnable) {
- error("unsupported")
- }
-
- override fun postToMainThread(runnable: Runnable) {
- mainScope.launch {
- runnable.run()
- }
- }
-
- override fun isMainThread(): Boolean {
- // we have only one thread in this test.
- return true
- }
- }
- )
- // manually roll the time
- mainDispatcher.pauseDispatcher()
- testScope.pauseDispatcher()
- }
-
- @After
- fun clear() {
- advanceTimeBy(100000)
- mainScope.cleanupTestCoroutines()
- testScope.cleanupTestCoroutines()
- ArchTaskExecutor.getInstance().setDelegate(null)
- Dispatchers.resetMain()
- }
+ @get:Rule
+ val scopes = ScopesRule()
+ private val mainScope = scopes.mainScope
+ private val testScope = scopes.testScope
@Test
fun oneShot() {
val liveData = liveData {
emit(3)
}
- triggerAllActions()
+ scopes.triggerAllActions()
assertThat(liveData.value).isNull()
liveData.addObserver().assertItems(3)
}
@@ -114,7 +68,7 @@
mainScope.advanceTimeBy(100)
assertThat(ld.hasActiveObservers()).isFalse()
ld.addObserver().apply {
- triggerAllActions()
+ scopes.triggerAllActions()
assertItems(2, 1, 2)
mainScope.advanceTimeBy(1001)
assertItems(2, 1, 2, 3)
@@ -183,19 +137,19 @@
}
}
ld.addObserver().apply {
- triggerAllActions()
+ scopes.triggerAllActions()
assertItems(1)
unsubscribe()
cancelMutex.unlock()
}
// let cancellation take place
- triggerAllActions()
+ scopes.triggerAllActions()
// emit should immediately trigger cancellation to happen
assertThat(ld.value).isEqualTo(1)
assertThat(ld.hasActiveObservers()).isFalse()
// now because it was cancelled, re-observing should dispatch 1,1,2
ld.addObserver().apply {
- triggerAllActions()
+ scopes.triggerAllActions()
assertItems(1, 1, 2)
}
}
@@ -206,11 +160,11 @@
val ld = liveData<Int>(testScope.coroutineContext) {
latest.set(latestValue)
}
- runOnMain {
+ scopes.runOnMain {
ld.value = 3
}
ld.addObserver()
- triggerAllActions()
+ scopes.triggerAllActions()
assertThat(latest.get()).isEqualTo(3)
}
@@ -222,7 +176,7 @@
latest.set(latestValue)
}
ld.addObserver()
- triggerAllActions()
+ scopes.triggerAllActions()
assertThat(latest.get()).isEqualTo(5)
}
@@ -238,16 +192,16 @@
latest.set(latestValue)
}
ld.addObserver().apply {
- triggerAllActions()
+ scopes.triggerAllActions()
assertItems(5)
unsubscribe()
}
- triggerAllActions()
+ scopes.triggerAllActions()
// wait for it to be cancelled
- advanceTimeBy(10)
+ scopes.advanceTimeBy(10)
assertThat(latest.get()).isNull()
ld.addObserver()
- triggerAllActions()
+ scopes.triggerAllActions()
assertThat(latest.get()).isEqualTo(5)
}
@@ -288,7 +242,7 @@
emitSource(evens)
}
ld.addObserver().apply {
- triggerAllActions()
+ scopes.triggerAllActions()
assertItems(1, 3, 5, 7, 9, 2, 4, 6, 8, 10)
}
}
@@ -310,8 +264,8 @@
emit(10)
}
ld.addObserver().apply {
- triggerAllActions()
- advanceTimeBy(100)
+ scopes.triggerAllActions()
+ scopes.advanceTimeBy(100)
assertItems(1, 3, 5, 7, 9, 10)
}
}
@@ -328,10 +282,10 @@
}
val ld = liveData {
val disposable = emitSource(odds)
- triggerAllActions()
+ scopes.triggerAllActions()
assertThat(odds.hasActiveObservers()).isEqualTo(true)
disposable.dispose()
- triggerAllActions()
+ scopes.triggerAllActions()
assertThat(odds.hasActiveObservers()).isEqualTo(false)
doneOddsYield.unlock()
}
@@ -349,16 +303,16 @@
}
val ld = liveData {
val disposable = emitSource(odds)
- triggerAllActions()
+ scopes.triggerAllActions()
disposable.dispose()
- triggerAllActions()
+ scopes.triggerAllActions()
assertThat(odds.hasActiveObservers()).isEqualTo(false)
// add observer via side channel.
(this as LiveDataScopeImpl<Int>).target.addSource(odds) {}
- triggerAllActions()
+ scopes.triggerAllActions()
// redispose previous one should not impact
disposable.dispose()
- triggerAllActions()
+ scopes.triggerAllActions()
// still has the observer we added from the side channel
assertThat(odds.hasActiveObservers()).isEqualTo(true)
}
@@ -385,16 +339,16 @@
}
}
ld.addObserver().apply {
- triggerAllActions()
+ scopes.triggerAllActions()
assertItems()
runBlocking {
assertThat(exception.await()).hasMessageThat().contains("i like to fail")
}
unsubscribe()
}
- triggerAllActions()
+ scopes.triggerAllActions()
ld.addObserver().apply {
- triggerAllActions()
+ scopes.triggerAllActions()
assertItems()
}
}
@@ -412,14 +366,14 @@
}
}
ld.addObserver().apply {
- triggerAllActions()
+ scopes.triggerAllActions()
assertItems()
unsubscribe()
}
assertThat(didCancel.get()).isTrue()
ld.addObserver()
// trigger cancelation
- advanceTimeBy(11)
+ scopes.advanceTimeBy(11)
assertThat(unexpected.get()).isFalse()
}
@@ -441,69 +395,21 @@
}
ld.addObserver().apply {
assertItems()
- runOnMain {
+ scopes.runOnMain {
src.value = 1
}
- triggerAllActions()
+ scopes.triggerAllActions()
runBlocking {
assertThat(exception.await()).hasMessageThat().contains("i like to fail")
}
- runOnMain {
+ scopes.runOnMain {
src.value = 2
}
- triggerAllActions()
+ scopes.triggerAllActions()
assertItems(3)
}
}
- private fun triggerAllActions() {
- do {
- mainScope.runCurrent()
- testScope.runCurrent()
- val allIdle = listOf(mainDispatcher, testDispatcher).all {
- it.isIdle()
- }
- } while (!allIdle)
- }
-
- private fun advanceTimeBy(time: Long) {
- mainScope.advanceTimeBy(time)
- testScope.advanceTimeBy(time)
- triggerAllActions()
- }
-
- private fun TestCoroutineDispatcher.isIdle(): Boolean {
- val queueField = this::class.java
- .getDeclaredField("queue")
- queueField.isAccessible = true
- val queue = queueField.get(this)
- val peekMethod = queue::class.java
- .getDeclaredMethod("peek")
- val nextTask = peekMethod.invoke(queue) ?: return true
- val timeField = nextTask::class.java.getDeclaredField("time")
- timeField.isAccessible = true
- val time = timeField.getLong(nextTask)
- return time > testDispatcher.currentTime
- }
-
- private fun <T> runOnMain(block: () -> T): T {
- return runBlocking {
- val async = mainScope.async {
- block()
- }
- mainScope.runCurrent()
- async.await()
- }
- }
-
- private fun <T> LiveData<T>.addObserver(): CollectingObserver<T> {
- return runOnMain {
- val observer = CollectingObserver(this)
- observeForever(observer)
- observer
- }
- }
-
@Test
fun multipleValuesAndObservers() {
val ld = liveData {
@@ -515,20 +421,5 @@
ld.addObserver().assertItems(4)
}
- inner class CollectingObserver<T>(
- private val liveData: LiveData<T>
- ) : Observer<T> {
- private var items = mutableListOf<T>()
- override fun onChanged(t: T) {
- items.add(t)
- }
-
- fun assertItems(vararg expected: T) {
- assertThat(items).containsExactly(*expected)
- }
-
- fun unsubscribe() = runOnMain {
- liveData.removeObserver(this)
- }
- }
+ private fun <T> LiveData<T>.addObserver() = this.addObserver(scopes)
}
\ No newline at end of file
diff --git a/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/FlowLiveDataTest.kt b/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/FlowLiveDataTest.kt
new file mode 100644
index 0000000..297c3d0
--- /dev/null
+++ b/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/FlowLiveDataTest.kt
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.lifecycle
+
+import com.google.common.truth.Truth.assertThat
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.CoroutineExceptionHandler
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.channels.awaitClose
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.callbackFlow
+import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.flow.flowOf
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.junit.Rule
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import java.time.Duration
+import java.util.concurrent.atomic.AtomicBoolean
+import kotlin.coroutines.coroutineContext
+
+@ExperimentalCoroutinesApi
+@RunWith(JUnit4::class)
+class FlowLiveDataTest {
+ @get:Rule
+ val scopes = ScopesRule()
+ private val mainScope = scopes.mainScope
+ private val testScope = scopes.testScope
+
+ private fun <T> LiveData<T>.addObserver() = this.addObserver(scopes)
+
+ @Test
+ fun oneShot() {
+ val liveData = flowOf(3).asLiveData()
+ scopes.triggerAllActions()
+ assertThat(liveData.value).isNull()
+ liveData.addObserver().assertItems(3)
+ }
+
+ @Test
+ fun removeObserverInBetween() {
+ val ld = flow {
+ emit(1)
+ emit(2)
+ delay(1000)
+ emit(3)
+ }.asLiveData(timeoutInMs = 10)
+
+ ld.addObserver().apply {
+ assertItems(1, 2)
+ unsubscribe()
+ }
+ // trigger cancellation
+ mainScope.advanceTimeBy(100)
+ assertThat(ld.hasActiveObservers()).isFalse()
+ ld.addObserver().apply {
+ scopes.triggerAllActions()
+ assertItems(2, 1, 2)
+ mainScope.advanceTimeBy(1001)
+ assertItems(2, 1, 2, 3)
+ }
+ }
+
+ @Test
+ fun callbackFlow_cancelled() {
+ var closeCalled = false
+ val ld = callbackFlow {
+ testScope.launch {
+ offer(1)
+ offer(2)
+ delay(1000)
+ offer(3)
+ }
+ awaitClose {
+ closeCalled = true
+ }
+ }.asLiveData(timeoutInMs = 10)
+
+ ld.addObserver().apply {
+ scopes.triggerAllActions()
+ assertItems(1, 2)
+ unsubscribe()
+ }
+ assertThat(closeCalled).isFalse()
+ // trigger cancellation
+ mainScope.advanceTimeBy(100)
+ assertThat(ld.hasActiveObservers()).isFalse()
+ assertThat(closeCalled).isTrue()
+ ld.addObserver().apply {
+ scopes.triggerAllActions()
+ assertItems(2, 1, 2)
+ scopes.advanceTimeBy(1001)
+ assertItems(2, 1, 2, 3)
+ }
+ }
+
+ @Test
+ fun removeObserverInBetween_largeTimeout() {
+ val ld = flow {
+ emit(1)
+ emit(2)
+ delay(1000)
+ emit(3)
+ }.asLiveData(timeoutInMs = 10000)
+
+ ld.addObserver().apply {
+ assertItems(1, 2)
+ unsubscribe()
+ }
+ // advance some but not enough to cover the delay
+ mainScope.advanceTimeBy(500)
+ assertThat(ld.hasActiveObservers()).isFalse()
+ assertThat(ld.value).isEqualTo(2)
+ ld.addObserver().apply {
+ assertItems(2)
+ // advance enough to cover the rest of the delay
+ mainScope.advanceTimeBy(501)
+ assertItems(2, 3)
+ }
+ }
+
+ @Test
+ fun timeoutViaDuration() {
+ val running = CompletableDeferred<Unit>()
+ val ld = flow {
+ try {
+ emit(1)
+ delay(5_001)
+ emit(2)
+ } finally {
+ running.complete(Unit)
+ }
+ }.asLiveData(timeout = Duration.ofSeconds(5))
+
+ ld.addObserver().apply {
+ assertItems(1)
+ unsubscribe()
+ }
+ // advance some but not enough to cover the delay
+ mainScope.advanceTimeBy(4_000)
+ assertThat(running.isActive).isTrue()
+ assertThat(ld.hasActiveObservers()).isFalse()
+ assertThat(ld.value).isEqualTo(1)
+ // advance time to finish
+ mainScope.advanceTimeBy(1_000)
+ // ensure it is not running anymore
+ assertThat(running.isActive).isFalse()
+ assertThat(ld.value).isEqualTo(1)
+ }
+
+ @Test
+ fun flowThrows() {
+ // use an exception handler instead of the test context exception handler to ensure that
+ // we do not re-run the block if its exception is gracefully caught
+ // TODO should we consider doing that ? But if we do, what is the rule? do we retry when
+ // it becomes active again or do we retry ourselves? better no do anything to be consistent.
+ val exception = CompletableDeferred<Throwable>()
+ val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
+ exception.complete(throwable)
+ }
+ val ld = flow {
+ if (exception.isActive) {
+ throw IllegalArgumentException("i like to fail")
+ } else {
+ emit(3)
+ }
+ }.asLiveData(testScope.coroutineContext + exceptionHandler, 10)
+ ld.addObserver().apply {
+ scopes.triggerAllActions()
+ assertItems()
+ runBlocking {
+ assertThat(exception.await()).hasMessageThat().contains("i like to fail")
+ }
+ unsubscribe()
+ }
+ scopes.triggerAllActions()
+ ld.addObserver().apply {
+ scopes.triggerAllActions()
+ assertItems()
+ }
+ }
+
+ @Test
+ fun flowCancelsItself() {
+ val didCancel = AtomicBoolean(false)
+ val unexpected = AtomicBoolean(false)
+
+ val ld = flow<Int> {
+ if (didCancel.compareAndSet(false, true)) {
+ coroutineContext.cancel()
+ } else {
+ unexpected.set(true)
+ }
+ }.asLiveData(testScope.coroutineContext, 10)
+ ld.addObserver().apply {
+ scopes.triggerAllActions()
+ assertItems()
+ unsubscribe()
+ }
+ assertThat(didCancel.get()).isTrue()
+ ld.addObserver()
+ // trigger cancelation
+ scopes.advanceTimeBy(11)
+ assertThat(unexpected.get()).isFalse()
+ }
+
+ @Test
+ fun multipleValuesAndObservers() {
+ val ld = flowOf(3, 4).asLiveData()
+ ld.addObserver().assertItems(3, 4)
+ // re-observe, get latest value only
+ ld.addObserver().assertItems(4)
+ }
+}
\ No newline at end of file
diff --git a/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/ScopesRule.kt b/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/ScopesRule.kt
new file mode 100644
index 0000000..9d96fb7
--- /dev/null
+++ b/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/ScopesRule.kt
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.lifecycle
+
+import androidx.arch.core.executor.ArchTaskExecutor
+import androidx.arch.core.executor.TaskExecutor
+import com.google.common.truth.Truth
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.async
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineDispatcher
+import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.resetMain
+import kotlinx.coroutines.test.setMain
+import org.junit.rules.TestWatcher
+import org.junit.runner.Description
+
+@ExperimentalCoroutinesApi
+class ScopesRule : TestWatcher() {
+ private val mainDispatcher = TestCoroutineDispatcher()
+ val mainScope = TestCoroutineScope(mainDispatcher)
+
+ private val testDispatcher = TestCoroutineDispatcher()
+ val testScope = TestCoroutineScope(testDispatcher)
+
+ override fun starting(description: Description?) {
+ Dispatchers.setMain(mainDispatcher)
+ ArchTaskExecutor.getInstance().setDelegate(
+ object : TaskExecutor() {
+ override fun executeOnDiskIO(runnable: Runnable) {
+ error("unsupported")
+ }
+
+ override fun postToMainThread(runnable: Runnable) {
+ mainScope.launch {
+ runnable.run()
+ }
+ }
+
+ override fun isMainThread(): Boolean {
+ // we have only one thread in this test.
+ return true
+ }
+ }
+ )
+ // manually roll the time
+ mainScope.pauseDispatcher()
+ testScope.pauseDispatcher()
+ }
+
+ override fun finished(description: Description?) {
+ advanceTimeBy(100000)
+ mainScope.cleanupTestCoroutines()
+ testScope.cleanupTestCoroutines()
+ ArchTaskExecutor.getInstance().setDelegate(null)
+ Dispatchers.resetMain()
+ }
+
+ fun advanceTimeBy(time: Long) {
+ mainScope.advanceTimeBy(time)
+ testScope.advanceTimeBy(time)
+ triggerAllActions()
+ }
+
+ fun triggerAllActions() {
+ do {
+ mainScope.runCurrent()
+ testScope.runCurrent()
+ val allIdle = listOf(mainDispatcher, testDispatcher).all {
+ it.isIdle()
+ }
+ } while (!allIdle)
+ }
+
+ fun <T> runOnMain(block: () -> T): T {
+ return runBlocking {
+ val async = mainScope.async {
+ block()
+ }
+ mainScope.runCurrent()
+ async.await()
+ }
+ }
+
+ private fun TestCoroutineDispatcher.isIdle(): Boolean {
+ val queueField = this::class.java
+ .getDeclaredField("queue")
+ queueField.isAccessible = true
+ val queue = queueField.get(this)
+ val peekMethod = queue::class.java
+ .getDeclaredMethod("peek")
+ val nextTask = peekMethod.invoke(queue) ?: return true
+ val timeField = nextTask::class.java.getDeclaredField("time")
+ timeField.isAccessible = true
+ val time = timeField.getLong(nextTask)
+ return time > testDispatcher.currentTime
+ }
+}
+
+@ExperimentalCoroutinesApi
+fun <T> LiveData<T>.addObserver(scopes: ScopesRule): CollectingObserver<T> {
+ return scopes.runOnMain {
+ val observer = CollectingObserver(this, scopes)
+ observeForever(observer)
+ observer
+ }
+}
+
+@ExperimentalCoroutinesApi
+class CollectingObserver<T>(
+ private val liveData: LiveData<T>,
+ private val scopes: ScopesRule
+) : Observer<T> {
+ private var items = mutableListOf<T>()
+ override fun onChanged(t: T) {
+ items.add(t)
+ }
+
+ fun assertItems(vararg expected: T) {
+ Truth.assertThat(items).containsExactly(*expected)
+ }
+
+ fun unsubscribe() = scopes.runOnMain {
+ liveData.removeObserver(this)
+ }
+}
\ No newline at end of file