Introduce Flow.asLiveData() terminal operator.

It inherits semantics and behavior from livedata {} builder.

Test: FlowLiveDataTest
Change-Id: I7c46bed89a81cd0c8550a5728061dae9fed2fb95
diff --git a/buildSrc/src/main/kotlin/androidx/build/dependencies/Dependencies.kt b/buildSrc/src/main/kotlin/androidx/build/dependencies/Dependencies.kt
index 2679c13..dced908 100644
--- a/buildSrc/src/main/kotlin/androidx/build/dependencies/Dependencies.kt
+++ b/buildSrc/src/main/kotlin/androidx/build/dependencies/Dependencies.kt
@@ -71,10 +71,14 @@
     "org.jetbrains.kotlinx:kotlinx-coroutines-guava:$KOTLIN_COROUTINES_VERSION"
 const val KOTLIN_COROUTINES_TEST =
     "org.jetbrains.kotlinx:kotlinx-coroutines-test:$KOTLIN_COROUTINES_VERSION"
+
+private const val KOTLIN_COROUTINES_PREVIEW_VERSION = "1.3.0-RC"
 const val KOTLIN_COROUTINES_PREVIEW =
-    "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-RC"
+    "org.jetbrains.kotlinx:kotlinx-coroutines-android:$KOTLIN_COROUTINES_PREVIEW_VERSION"
+const val KOTLIN_COROUTINES_CORE_PREVIEW =
+    "org.jetbrains.kotlinx:kotlinx-coroutines-core:$KOTLIN_COROUTINES_PREVIEW_VERSION"
 const val KOTLIN_COROUTINES_TEST_PREVIEW =
-    "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.0-RC"
+    "org.jetbrains.kotlinx:kotlinx-coroutines-test:$KOTLIN_COROUTINES_PREVIEW_VERSION"
 
 const val LEAKCANARY_INSTRUMENTATION =
     "com.squareup.leakcanary:leakcanary-android-instrumentation:1.6.2"
diff --git a/lifecycle/lifecycle-livedata-ktx/api/2.2.0-alpha03.txt b/lifecycle/lifecycle-livedata-ktx/api/2.2.0-alpha03.txt
index 9dc2bee..5aec16c 100644
--- a/lifecycle/lifecycle-livedata-ktx/api/2.2.0-alpha03.txt
+++ b/lifecycle/lifecycle-livedata-ktx/api/2.2.0-alpha03.txt
@@ -11,6 +11,16 @@
     method public static <T> androidx.lifecycle.LiveData<T> liveData(kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L, kotlin.jvm.functions.Function2<? super androidx.lifecycle.LiveDataScope<T>,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?> block);
   }
 
+  public final class FlowLiveDataApi26Kt {
+    ctor public FlowLiveDataApi26Kt();
+    method @RequiresApi(android.os.Build.VERSION_CODES.O) public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, java.time.Duration timeout);
+  }
+
+  public final class FlowLiveDataKt {
+    ctor public FlowLiveDataKt();
+    method public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L);
+  }
+
   public interface LiveDataScope<T> {
     method public suspend Object! emit(T? value, kotlin.coroutines.Continuation<? super kotlin.Unit> p);
     method public suspend Object emitSource(androidx.lifecycle.LiveData<T> source, kotlin.coroutines.Continuation<? super kotlinx.coroutines.DisposableHandle> p);
diff --git a/lifecycle/lifecycle-livedata-ktx/api/current.txt b/lifecycle/lifecycle-livedata-ktx/api/current.txt
index 9dc2bee..5aec16c 100644
--- a/lifecycle/lifecycle-livedata-ktx/api/current.txt
+++ b/lifecycle/lifecycle-livedata-ktx/api/current.txt
@@ -11,6 +11,16 @@
     method public static <T> androidx.lifecycle.LiveData<T> liveData(kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L, kotlin.jvm.functions.Function2<? super androidx.lifecycle.LiveDataScope<T>,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?> block);
   }
 
+  public final class FlowLiveDataApi26Kt {
+    ctor public FlowLiveDataApi26Kt();
+    method @RequiresApi(android.os.Build.VERSION_CODES.O) public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, java.time.Duration timeout);
+  }
+
+  public final class FlowLiveDataKt {
+    ctor public FlowLiveDataKt();
+    method public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L);
+  }
+
   public interface LiveDataScope<T> {
     method public suspend Object! emit(T? value, kotlin.coroutines.Continuation<? super kotlin.Unit> p);
     method public suspend Object emitSource(androidx.lifecycle.LiveData<T> source, kotlin.coroutines.Continuation<? super kotlinx.coroutines.DisposableHandle> p);
diff --git a/lifecycle/lifecycle-livedata-ktx/api/restricted_2.2.0-alpha03.txt b/lifecycle/lifecycle-livedata-ktx/api/restricted_2.2.0-alpha03.txt
index 9dc2bee..5aec16c 100644
--- a/lifecycle/lifecycle-livedata-ktx/api/restricted_2.2.0-alpha03.txt
+++ b/lifecycle/lifecycle-livedata-ktx/api/restricted_2.2.0-alpha03.txt
@@ -11,6 +11,16 @@
     method public static <T> androidx.lifecycle.LiveData<T> liveData(kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L, kotlin.jvm.functions.Function2<? super androidx.lifecycle.LiveDataScope<T>,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?> block);
   }
 
+  public final class FlowLiveDataApi26Kt {
+    ctor public FlowLiveDataApi26Kt();
+    method @RequiresApi(android.os.Build.VERSION_CODES.O) public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, java.time.Duration timeout);
+  }
+
+  public final class FlowLiveDataKt {
+    ctor public FlowLiveDataKt();
+    method public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L);
+  }
+
   public interface LiveDataScope<T> {
     method public suspend Object! emit(T? value, kotlin.coroutines.Continuation<? super kotlin.Unit> p);
     method public suspend Object emitSource(androidx.lifecycle.LiveData<T> source, kotlin.coroutines.Continuation<? super kotlinx.coroutines.DisposableHandle> p);
diff --git a/lifecycle/lifecycle-livedata-ktx/api/restricted_current.txt b/lifecycle/lifecycle-livedata-ktx/api/restricted_current.txt
index 9dc2bee..5aec16c 100644
--- a/lifecycle/lifecycle-livedata-ktx/api/restricted_current.txt
+++ b/lifecycle/lifecycle-livedata-ktx/api/restricted_current.txt
@@ -11,6 +11,16 @@
     method public static <T> androidx.lifecycle.LiveData<T> liveData(kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L, kotlin.jvm.functions.Function2<? super androidx.lifecycle.LiveDataScope<T>,? super kotlin.coroutines.Continuation<? super kotlin.Unit>,?> block);
   }
 
+  public final class FlowLiveDataApi26Kt {
+    ctor public FlowLiveDataApi26Kt();
+    method @RequiresApi(android.os.Build.VERSION_CODES.O) public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, java.time.Duration timeout);
+  }
+
+  public final class FlowLiveDataKt {
+    ctor public FlowLiveDataKt();
+    method public static <T> androidx.lifecycle.LiveData<T> asLiveData(kotlinx.coroutines.flow.Flow<? extends T>, kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L);
+  }
+
   public interface LiveDataScope<T> {
     method public suspend Object! emit(T? value, kotlin.coroutines.Continuation<? super kotlin.Unit> p);
     method public suspend Object emitSource(androidx.lifecycle.LiveData<T> source, kotlin.coroutines.Continuation<? super kotlinx.coroutines.DisposableHandle> p);
diff --git a/lifecycle/lifecycle-livedata-ktx/build.gradle b/lifecycle/lifecycle-livedata-ktx/build.gradle
index 0c72bc8..cf0517c 100644
--- a/lifecycle/lifecycle-livedata-ktx/build.gradle
+++ b/lifecycle/lifecycle-livedata-ktx/build.gradle
@@ -41,7 +41,7 @@
     api(project(":lifecycle:lifecycle-livedata"))
     api(project(":lifecycle:lifecycle-livedata-core-ktx"))
     api(KOTLIN_STDLIB)
-    api(KOTLIN_COROUTINES_CORE)
+    api(KOTLIN_COROUTINES_CORE_PREVIEW)
     testImplementation(project(":lifecycle:lifecycle-runtime"))
     testImplementation("androidx.arch.core:core-testing:2.1.0-rc01")
     testImplementation(JUNIT)
diff --git a/lifecycle/lifecycle-livedata-ktx/src/main/java/androidx/lifecycle/FlowLiveData.kt b/lifecycle/lifecycle-livedata-ktx/src/main/java/androidx/lifecycle/FlowLiveData.kt
new file mode 100644
index 0000000..d14c561
--- /dev/null
+++ b/lifecycle/lifecycle-livedata-ktx/src/main/java/androidx/lifecycle/FlowLiveData.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.lifecycle
+
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.collect
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.EmptyCoroutineContext
+
+/**
+ * Creates a LiveData that has values collected from the origin [Flow].
+ *
+ * The upstream flow collection starts when the returned [LiveData] becomes active
+ * ([LiveData.onActive]).
+ * If the [LiveData] becomes inactive ([LiveData.onInactive]) while the flow has not completed,
+ * the flow collection will be cancelled after [timeoutInMs] milliseconds unless the [LiveData]
+ * becomes active again before that timeout (to gracefully handle cases like Activity rotation).
+ *
+ * After a cancellation, if the [LiveData] becomes active again, the upstream flow collection will
+ * be re-executed.
+ *
+ * If the upstream flow completes successfully *or* is cancelled due to reasons other than
+ * [LiveData] becoming inactive, it *will not* be re-collected even after [LiveData] goes through
+ * active inactive cycle.
+ *
+ * If flow completes with an exception, then exception will be delivered to the
+ * [CoroutineExceptionHandler][kotlinx.coroutines.CoroutineExceptionHandler] of provided [context].
+ * By default [EmptyCoroutineContext] is used to so an exception will be delivered to main's
+ * thread [UncaughtExceptionHandler][Thread.UncaughtExceptionHandler]. If your flow upstream is
+ * expected to throw, you can use [catch operator][kotlinx.coroutines.flow.catch] on upstream flow
+ * to emit a helpful error object.
+ *
+ * @param context The CoroutineContext to collect the upstream flow in. Defaults to
+ * [EmptyCoroutineContext] combined with [Dispatchers.Main]
+ * @param timeoutInMs The timeout in ms before cancelling the block if there are no active observers
+ * ([LiveData.hasActiveObservers]. Defaults to [DEFAULT_TIMEOUT].
+ */
+fun <T> Flow<T>.asLiveData(
+    context: CoroutineContext = EmptyCoroutineContext,
+    timeoutInMs: Long = DEFAULT_TIMEOUT
+): LiveData<T> = liveData(context, timeoutInMs) {
+    collect {
+        emit(it)
+    }
+}
\ No newline at end of file
diff --git a/lifecycle/lifecycle-livedata-ktx/src/main/java/androidx/lifecycle/FlowLiveDataApi26.kt b/lifecycle/lifecycle-livedata-ktx/src/main/java/androidx/lifecycle/FlowLiveDataApi26.kt
new file mode 100644
index 0000000..ac2643d
--- /dev/null
+++ b/lifecycle/lifecycle-livedata-ktx/src/main/java/androidx/lifecycle/FlowLiveDataApi26.kt
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.lifecycle
+
+import android.os.Build
+import androidx.annotation.RequiresApi
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.flow.Flow
+import java.time.Duration
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.EmptyCoroutineContext
+
+/**
+ * Creates a LiveData that has values collected from the origin [Flow].
+ *
+ * The upstream flow collection starts when the returned [LiveData] becomes active
+ * ([LiveData.onActive]).
+ * If the [LiveData] becomes inactive ([LiveData.onInactive]) while the flow has not completed,
+ * the flow collection will be cancelled after [timeout] unless the [LiveData]
+ * becomes active again before that timeout (to gracefully handle cases like Activity rotation).
+ *
+ * After a cancellation, if the [LiveData] becomes active again, the upstream flow collection will
+ * be re-executed.
+ *
+ * If the upstream flow completes successfully *or* is cancelled due to reasons other than
+ * [LiveData] becoming inactive, it *will not* be re-collected even after [LiveData] goes through
+ * active inactive cycle.
+ *
+ * If flow completes with an exception, then exception will be delivered to the
+ * [CoroutineExceptionHandler][kotlinx.coroutines.CoroutineExceptionHandler] of provided [context].
+ * By default [EmptyCoroutineContext] is used to so an exception will be delivered to main's
+ * thread [UncaughtExceptionHandler][Thread.UncaughtExceptionHandler]. If your flow upstream is
+ * expected to throw, you can use [catch operator][kotlinx.coroutines.flow.catch] on upstream flow
+ * to emit a helpful error object.
+ *
+ * @param context The CoroutineContext to collect the upstream flow in. Defaults to
+ * [EmptyCoroutineContext] combined with [Dispatchers.Main]
+ * @param timeout The timeout in ms before cancelling the block if there are no active observers
+ * ([LiveData.hasActiveObservers]. Defaults to [DEFAULT_TIMEOUT].
+ */
+@RequiresApi(Build.VERSION_CODES.O)
+fun <T> Flow<T>.asLiveData(
+    context: CoroutineContext = EmptyCoroutineContext,
+    timeout: Duration
+): LiveData<T> = asLiveData(context, timeout.toMillis())
\ No newline at end of file
diff --git a/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt b/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt
index d0e6b8c..9e55611 100644
--- a/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt
+++ b/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt
@@ -16,26 +16,16 @@
 
 package androidx.lifecycle
 
-import androidx.arch.core.executor.ArchTaskExecutor
-import androidx.arch.core.executor.TaskExecutor
 import com.google.common.truth.Truth.assertThat
 import kotlinx.coroutines.CompletableDeferred
 import kotlinx.coroutines.CoroutineExceptionHandler
-import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.async
 import kotlinx.coroutines.cancel
 import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
 import kotlinx.coroutines.sync.Mutex
 import kotlinx.coroutines.sync.withLock
-import kotlinx.coroutines.test.TestCoroutineDispatcher
-import kotlinx.coroutines.test.TestCoroutineScope
-import kotlinx.coroutines.test.resetMain
-import kotlinx.coroutines.test.setMain
-import org.junit.After
-import org.junit.Before
+import org.junit.Rule
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.JUnit4
@@ -47,53 +37,17 @@
 @ExperimentalCoroutinesApi
 @RunWith(JUnit4::class)
 class BuildLiveDataTest {
-    private val mainDispatcher = TestCoroutineDispatcher()
-    private val mainScope = TestCoroutineScope(mainDispatcher)
-
-    private val testDispatcher = TestCoroutineDispatcher()
-    private val testScope = TestCoroutineScope(testDispatcher)
-
-    @Before
-    fun initMain() {
-        Dispatchers.setMain(mainDispatcher)
-        ArchTaskExecutor.getInstance().setDelegate(
-            object : TaskExecutor() {
-                override fun executeOnDiskIO(runnable: Runnable) {
-                    error("unsupported")
-                }
-
-                override fun postToMainThread(runnable: Runnable) {
-                    mainScope.launch {
-                        runnable.run()
-                    }
-                }
-
-                override fun isMainThread(): Boolean {
-                    // we have only one thread in this test.
-                    return true
-                }
-            }
-        )
-        // manually roll the time
-        mainDispatcher.pauseDispatcher()
-        testScope.pauseDispatcher()
-    }
-
-    @After
-    fun clear() {
-        advanceTimeBy(100000)
-        mainScope.cleanupTestCoroutines()
-        testScope.cleanupTestCoroutines()
-        ArchTaskExecutor.getInstance().setDelegate(null)
-        Dispatchers.resetMain()
-    }
+    @get:Rule
+    val scopes = ScopesRule()
+    private val mainScope = scopes.mainScope
+    private val testScope = scopes.testScope
 
     @Test
     fun oneShot() {
         val liveData = liveData {
             emit(3)
         }
-        triggerAllActions()
+        scopes.triggerAllActions()
         assertThat(liveData.value).isNull()
         liveData.addObserver().assertItems(3)
     }
@@ -114,7 +68,7 @@
         mainScope.advanceTimeBy(100)
         assertThat(ld.hasActiveObservers()).isFalse()
         ld.addObserver().apply {
-            triggerAllActions()
+            scopes.triggerAllActions()
             assertItems(2, 1, 2)
             mainScope.advanceTimeBy(1001)
             assertItems(2, 1, 2, 3)
@@ -183,19 +137,19 @@
             }
         }
         ld.addObserver().apply {
-            triggerAllActions()
+            scopes.triggerAllActions()
             assertItems(1)
             unsubscribe()
             cancelMutex.unlock()
         }
         // let cancellation take place
-        triggerAllActions()
+        scopes.triggerAllActions()
         // emit should immediately trigger cancellation to happen
         assertThat(ld.value).isEqualTo(1)
         assertThat(ld.hasActiveObservers()).isFalse()
         // now because it was cancelled, re-observing should dispatch 1,1,2
         ld.addObserver().apply {
-            triggerAllActions()
+            scopes.triggerAllActions()
             assertItems(1, 1, 2)
         }
     }
@@ -206,11 +160,11 @@
         val ld = liveData<Int>(testScope.coroutineContext) {
             latest.set(latestValue)
         }
-        runOnMain {
+        scopes.runOnMain {
             ld.value = 3
         }
         ld.addObserver()
-        triggerAllActions()
+        scopes.triggerAllActions()
         assertThat(latest.get()).isEqualTo(3)
     }
 
@@ -222,7 +176,7 @@
             latest.set(latestValue)
         }
         ld.addObserver()
-        triggerAllActions()
+        scopes.triggerAllActions()
         assertThat(latest.get()).isEqualTo(5)
     }
 
@@ -238,16 +192,16 @@
             latest.set(latestValue)
         }
         ld.addObserver().apply {
-            triggerAllActions()
+            scopes.triggerAllActions()
             assertItems(5)
             unsubscribe()
         }
-        triggerAllActions()
+        scopes.triggerAllActions()
         // wait for it to be cancelled
-        advanceTimeBy(10)
+        scopes.advanceTimeBy(10)
         assertThat(latest.get()).isNull()
         ld.addObserver()
-        triggerAllActions()
+        scopes.triggerAllActions()
         assertThat(latest.get()).isEqualTo(5)
     }
 
@@ -288,7 +242,7 @@
             emitSource(evens)
         }
         ld.addObserver().apply {
-            triggerAllActions()
+            scopes.triggerAllActions()
             assertItems(1, 3, 5, 7, 9, 2, 4, 6, 8, 10)
         }
     }
@@ -310,8 +264,8 @@
             emit(10)
         }
         ld.addObserver().apply {
-            triggerAllActions()
-            advanceTimeBy(100)
+            scopes.triggerAllActions()
+            scopes.advanceTimeBy(100)
             assertItems(1, 3, 5, 7, 9, 10)
         }
     }
@@ -328,10 +282,10 @@
         }
         val ld = liveData {
             val disposable = emitSource(odds)
-            triggerAllActions()
+            scopes.triggerAllActions()
             assertThat(odds.hasActiveObservers()).isEqualTo(true)
             disposable.dispose()
-            triggerAllActions()
+            scopes.triggerAllActions()
             assertThat(odds.hasActiveObservers()).isEqualTo(false)
             doneOddsYield.unlock()
         }
@@ -349,16 +303,16 @@
         }
         val ld = liveData {
             val disposable = emitSource(odds)
-            triggerAllActions()
+            scopes.triggerAllActions()
             disposable.dispose()
-            triggerAllActions()
+            scopes.triggerAllActions()
             assertThat(odds.hasActiveObservers()).isEqualTo(false)
             // add observer via side channel.
             (this as LiveDataScopeImpl<Int>).target.addSource(odds) {}
-            triggerAllActions()
+            scopes.triggerAllActions()
             // redispose previous one should not impact
             disposable.dispose()
-            triggerAllActions()
+            scopes.triggerAllActions()
             // still has the observer we added from the side channel
             assertThat(odds.hasActiveObservers()).isEqualTo(true)
         }
@@ -385,16 +339,16 @@
             }
         }
         ld.addObserver().apply {
-            triggerAllActions()
+            scopes.triggerAllActions()
             assertItems()
             runBlocking {
                 assertThat(exception.await()).hasMessageThat().contains("i like to fail")
             }
             unsubscribe()
         }
-        triggerAllActions()
+        scopes.triggerAllActions()
         ld.addObserver().apply {
-            triggerAllActions()
+            scopes.triggerAllActions()
             assertItems()
         }
     }
@@ -412,14 +366,14 @@
             }
         }
         ld.addObserver().apply {
-            triggerAllActions()
+            scopes.triggerAllActions()
             assertItems()
             unsubscribe()
         }
         assertThat(didCancel.get()).isTrue()
         ld.addObserver()
         // trigger cancelation
-        advanceTimeBy(11)
+        scopes.advanceTimeBy(11)
         assertThat(unexpected.get()).isFalse()
     }
 
@@ -441,69 +395,21 @@
         }
         ld.addObserver().apply {
             assertItems()
-            runOnMain {
+            scopes.runOnMain {
                 src.value = 1
             }
-            triggerAllActions()
+            scopes.triggerAllActions()
             runBlocking {
                 assertThat(exception.await()).hasMessageThat().contains("i like to fail")
             }
-            runOnMain {
+            scopes.runOnMain {
                 src.value = 2
             }
-            triggerAllActions()
+            scopes.triggerAllActions()
             assertItems(3)
         }
     }
 
-    private fun triggerAllActions() {
-        do {
-            mainScope.runCurrent()
-            testScope.runCurrent()
-            val allIdle = listOf(mainDispatcher, testDispatcher).all {
-                it.isIdle()
-            }
-        } while (!allIdle)
-    }
-
-    private fun advanceTimeBy(time: Long) {
-        mainScope.advanceTimeBy(time)
-        testScope.advanceTimeBy(time)
-        triggerAllActions()
-    }
-
-    private fun TestCoroutineDispatcher.isIdle(): Boolean {
-        val queueField = this::class.java
-            .getDeclaredField("queue")
-        queueField.isAccessible = true
-        val queue = queueField.get(this)
-        val peekMethod = queue::class.java
-            .getDeclaredMethod("peek")
-        val nextTask = peekMethod.invoke(queue) ?: return true
-        val timeField = nextTask::class.java.getDeclaredField("time")
-        timeField.isAccessible = true
-        val time = timeField.getLong(nextTask)
-        return time > testDispatcher.currentTime
-    }
-
-    private fun <T> runOnMain(block: () -> T): T {
-        return runBlocking {
-            val async = mainScope.async {
-                block()
-            }
-            mainScope.runCurrent()
-            async.await()
-        }
-    }
-
-    private fun <T> LiveData<T>.addObserver(): CollectingObserver<T> {
-        return runOnMain {
-            val observer = CollectingObserver(this)
-            observeForever(observer)
-            observer
-        }
-    }
-
     @Test
     fun multipleValuesAndObservers() {
         val ld = liveData {
@@ -515,20 +421,5 @@
         ld.addObserver().assertItems(4)
     }
 
-    inner class CollectingObserver<T>(
-        private val liveData: LiveData<T>
-    ) : Observer<T> {
-        private var items = mutableListOf<T>()
-        override fun onChanged(t: T) {
-            items.add(t)
-        }
-
-        fun assertItems(vararg expected: T) {
-            assertThat(items).containsExactly(*expected)
-        }
-
-        fun unsubscribe() = runOnMain {
-            liveData.removeObserver(this)
-        }
-    }
+    private fun <T> LiveData<T>.addObserver() = this.addObserver(scopes)
 }
\ No newline at end of file
diff --git a/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/FlowLiveDataTest.kt b/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/FlowLiveDataTest.kt
new file mode 100644
index 0000000..297c3d0
--- /dev/null
+++ b/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/FlowLiveDataTest.kt
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.lifecycle
+
+import com.google.common.truth.Truth.assertThat
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.CoroutineExceptionHandler
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.channels.awaitClose
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.callbackFlow
+import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.flow.flowOf
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.junit.Rule
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import java.time.Duration
+import java.util.concurrent.atomic.AtomicBoolean
+import kotlin.coroutines.coroutineContext
+
+@ExperimentalCoroutinesApi
+@RunWith(JUnit4::class)
+class FlowLiveDataTest {
+    @get:Rule
+    val scopes = ScopesRule()
+    private val mainScope = scopes.mainScope
+    private val testScope = scopes.testScope
+
+    private fun <T> LiveData<T>.addObserver() = this.addObserver(scopes)
+
+    @Test
+    fun oneShot() {
+        val liveData = flowOf(3).asLiveData()
+        scopes.triggerAllActions()
+        assertThat(liveData.value).isNull()
+        liveData.addObserver().assertItems(3)
+    }
+
+    @Test
+    fun removeObserverInBetween() {
+        val ld = flow {
+                emit(1)
+                emit(2)
+                delay(1000)
+                emit(3)
+        }.asLiveData(timeoutInMs = 10)
+
+        ld.addObserver().apply {
+            assertItems(1, 2)
+            unsubscribe()
+        }
+        // trigger cancellation
+        mainScope.advanceTimeBy(100)
+        assertThat(ld.hasActiveObservers()).isFalse()
+        ld.addObserver().apply {
+            scopes.triggerAllActions()
+            assertItems(2, 1, 2)
+            mainScope.advanceTimeBy(1001)
+            assertItems(2, 1, 2, 3)
+        }
+    }
+
+    @Test
+    fun callbackFlow_cancelled() {
+        var closeCalled = false
+        val ld = callbackFlow {
+            testScope.launch {
+                offer(1)
+                offer(2)
+                delay(1000)
+                offer(3)
+            }
+            awaitClose {
+                closeCalled = true
+            }
+        }.asLiveData(timeoutInMs = 10)
+
+        ld.addObserver().apply {
+            scopes.triggerAllActions()
+            assertItems(1, 2)
+            unsubscribe()
+        }
+        assertThat(closeCalled).isFalse()
+        // trigger cancellation
+        mainScope.advanceTimeBy(100)
+        assertThat(ld.hasActiveObservers()).isFalse()
+        assertThat(closeCalled).isTrue()
+        ld.addObserver().apply {
+            scopes.triggerAllActions()
+            assertItems(2, 1, 2)
+            scopes.advanceTimeBy(1001)
+            assertItems(2, 1, 2, 3)
+        }
+    }
+
+    @Test
+    fun removeObserverInBetween_largeTimeout() {
+        val ld = flow {
+            emit(1)
+            emit(2)
+            delay(1000)
+            emit(3)
+        }.asLiveData(timeoutInMs = 10000)
+
+        ld.addObserver().apply {
+            assertItems(1, 2)
+            unsubscribe()
+        }
+        // advance some but not enough to cover the delay
+        mainScope.advanceTimeBy(500)
+        assertThat(ld.hasActiveObservers()).isFalse()
+        assertThat(ld.value).isEqualTo(2)
+        ld.addObserver().apply {
+            assertItems(2)
+            // advance enough to cover the rest of the delay
+            mainScope.advanceTimeBy(501)
+            assertItems(2, 3)
+        }
+    }
+
+    @Test
+    fun timeoutViaDuration() {
+        val running = CompletableDeferred<Unit>()
+        val ld = flow {
+            try {
+                emit(1)
+                delay(5_001)
+                emit(2)
+            } finally {
+                running.complete(Unit)
+            }
+        }.asLiveData(timeout = Duration.ofSeconds(5))
+
+        ld.addObserver().apply {
+            assertItems(1)
+            unsubscribe()
+        }
+        // advance some but not enough to cover the delay
+        mainScope.advanceTimeBy(4_000)
+        assertThat(running.isActive).isTrue()
+        assertThat(ld.hasActiveObservers()).isFalse()
+        assertThat(ld.value).isEqualTo(1)
+        // advance time to finish
+        mainScope.advanceTimeBy(1_000)
+        // ensure it is not running anymore
+        assertThat(running.isActive).isFalse()
+        assertThat(ld.value).isEqualTo(1)
+    }
+
+    @Test
+    fun flowThrows() {
+        // use an exception handler instead of the test context exception handler to ensure that
+        // we do not re-run the block if its exception is gracefully caught
+        // TODO should we consider doing that ? But if we do, what is the rule? do we retry when
+        // it becomes active again or do we retry ourselves? better no do anything to be consistent.
+        val exception = CompletableDeferred<Throwable>()
+        val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
+            exception.complete(throwable)
+        }
+        val ld = flow {
+            if (exception.isActive) {
+                throw IllegalArgumentException("i like to fail")
+            } else {
+                emit(3)
+            }
+        }.asLiveData(testScope.coroutineContext + exceptionHandler, 10)
+        ld.addObserver().apply {
+            scopes.triggerAllActions()
+            assertItems()
+            runBlocking {
+                assertThat(exception.await()).hasMessageThat().contains("i like to fail")
+            }
+            unsubscribe()
+        }
+        scopes.triggerAllActions()
+        ld.addObserver().apply {
+            scopes.triggerAllActions()
+            assertItems()
+        }
+    }
+
+    @Test
+    fun flowCancelsItself() {
+        val didCancel = AtomicBoolean(false)
+        val unexpected = AtomicBoolean(false)
+
+        val ld = flow<Int> {
+            if (didCancel.compareAndSet(false, true)) {
+                coroutineContext.cancel()
+            } else {
+                unexpected.set(true)
+            }
+        }.asLiveData(testScope.coroutineContext, 10)
+        ld.addObserver().apply {
+            scopes.triggerAllActions()
+            assertItems()
+            unsubscribe()
+        }
+        assertThat(didCancel.get()).isTrue()
+        ld.addObserver()
+        // trigger cancelation
+        scopes.advanceTimeBy(11)
+        assertThat(unexpected.get()).isFalse()
+    }
+
+    @Test
+    fun multipleValuesAndObservers() {
+        val ld = flowOf(3, 4).asLiveData()
+        ld.addObserver().assertItems(3, 4)
+        // re-observe, get latest value only
+        ld.addObserver().assertItems(4)
+    }
+}
\ No newline at end of file
diff --git a/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/ScopesRule.kt b/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/ScopesRule.kt
new file mode 100644
index 0000000..9d96fb7
--- /dev/null
+++ b/lifecycle/lifecycle-livedata-ktx/src/test/java/androidx/lifecycle/ScopesRule.kt
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.lifecycle
+
+import androidx.arch.core.executor.ArchTaskExecutor
+import androidx.arch.core.executor.TaskExecutor
+import com.google.common.truth.Truth
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.async
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineDispatcher
+import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.resetMain
+import kotlinx.coroutines.test.setMain
+import org.junit.rules.TestWatcher
+import org.junit.runner.Description
+
+@ExperimentalCoroutinesApi
+class ScopesRule : TestWatcher() {
+    private val mainDispatcher = TestCoroutineDispatcher()
+    val mainScope = TestCoroutineScope(mainDispatcher)
+
+    private val testDispatcher = TestCoroutineDispatcher()
+    val testScope = TestCoroutineScope(testDispatcher)
+
+    override fun starting(description: Description?) {
+        Dispatchers.setMain(mainDispatcher)
+        ArchTaskExecutor.getInstance().setDelegate(
+            object : TaskExecutor() {
+                override fun executeOnDiskIO(runnable: Runnable) {
+                    error("unsupported")
+                }
+
+                override fun postToMainThread(runnable: Runnable) {
+                    mainScope.launch {
+                        runnable.run()
+                    }
+                }
+
+                override fun isMainThread(): Boolean {
+                    // we have only one thread in this test.
+                    return true
+                }
+            }
+        )
+        // manually roll the time
+        mainScope.pauseDispatcher()
+        testScope.pauseDispatcher()
+    }
+
+    override fun finished(description: Description?) {
+        advanceTimeBy(100000)
+        mainScope.cleanupTestCoroutines()
+        testScope.cleanupTestCoroutines()
+        ArchTaskExecutor.getInstance().setDelegate(null)
+        Dispatchers.resetMain()
+    }
+
+    fun advanceTimeBy(time: Long) {
+        mainScope.advanceTimeBy(time)
+        testScope.advanceTimeBy(time)
+        triggerAllActions()
+    }
+
+    fun triggerAllActions() {
+        do {
+            mainScope.runCurrent()
+            testScope.runCurrent()
+            val allIdle = listOf(mainDispatcher, testDispatcher).all {
+                it.isIdle()
+            }
+        } while (!allIdle)
+    }
+
+    fun <T> runOnMain(block: () -> T): T {
+        return runBlocking {
+            val async = mainScope.async {
+                block()
+            }
+            mainScope.runCurrent()
+            async.await()
+        }
+    }
+
+    private fun TestCoroutineDispatcher.isIdle(): Boolean {
+        val queueField = this::class.java
+            .getDeclaredField("queue")
+        queueField.isAccessible = true
+        val queue = queueField.get(this)
+        val peekMethod = queue::class.java
+            .getDeclaredMethod("peek")
+        val nextTask = peekMethod.invoke(queue) ?: return true
+        val timeField = nextTask::class.java.getDeclaredField("time")
+        timeField.isAccessible = true
+        val time = timeField.getLong(nextTask)
+        return time > testDispatcher.currentTime
+    }
+}
+
+@ExperimentalCoroutinesApi
+fun <T> LiveData<T>.addObserver(scopes: ScopesRule): CollectingObserver<T> {
+    return scopes.runOnMain {
+        val observer = CollectingObserver(this, scopes)
+        observeForever(observer)
+        observer
+    }
+}
+
+@ExperimentalCoroutinesApi
+class CollectingObserver<T>(
+    private val liveData: LiveData<T>,
+    private val scopes: ScopesRule
+) : Observer<T> {
+    private var items = mutableListOf<T>()
+    override fun onChanged(t: T) {
+        items.add(t)
+    }
+
+    fun assertItems(vararg expected: T) {
+        Truth.assertThat(items).containsExactly(*expected)
+    }
+
+    fun unsubscribe() = scopes.runOnMain {
+        liveData.removeObserver(this)
+    }
+}
\ No newline at end of file