Convert DataSource.load to coroutine

Part of a series of changes needed to convert Paging for
ListenerableFutures to Kotlin coroutines.

Makes DataSource.Param public to workaround a Metalava bug b/135947782,
which marks internal APIs as public and @hide does not currently provide
a way to hide them from the api docs.

Test: ./gradlew paging:paging-common:check
Change-Id: I57d34b406fd9f758fc096c244053454dd1199f79
diff --git a/paging/common/api/2.2.0-alpha01.ignore b/paging/common/api/2.2.0-alpha01.ignore
index 6d27f48..e04723a 100644
--- a/paging/common/api/2.2.0-alpha01.ignore
+++ b/paging/common/api/2.2.0-alpha01.ignore
@@ -1,4 +1,6 @@
 // Baseline format: 1.0
+AddedAbstractMethod: androidx.paging.DataSource#load$lintWithKotlin(androidx.paging.DataSource.Params<Key>, kotlin.coroutines.Continuation<? super androidx.paging.DataSource.BaseResult<Value>>):
+    Added method androidx.paging.DataSource.load$lintWithKotlin(androidx.paging.DataSource.Params<Key>,kotlin.coroutines.Continuation<? super androidx.paging.DataSource.BaseResult<Value>>)
 AddedAbstractMethod: androidx.paging.PagedList#isContiguous():
     Added method androidx.paging.PagedList.isContiguous()
 
diff --git a/paging/common/api/2.2.0-alpha01.txt b/paging/common/api/2.2.0-alpha01.txt
index 115c938..6738de4 100644
--- a/paging/common/api/2.2.0-alpha01.txt
+++ b/paging/common/api/2.2.0-alpha01.txt
@@ -8,6 +8,7 @@
     method @AnyThread public void invalidate();
     method @WorkerThread public boolean isInvalid();
     method public boolean isRetryableError(Throwable error);
+    method public abstract suspend Object load$lintWithKotlin(androidx.paging.DataSource.Params<Key> params, kotlin.coroutines.Continuation<? super androidx.paging.DataSource.BaseResult<Value>> p);
     method public <ToValue> androidx.paging.DataSource<Key,ToValue> map(androidx.arch.core.util.Function<Value,ToValue> function);
     method public <ToValue> androidx.paging.DataSource<Key,ToValue> map(kotlin.jvm.functions.Function1<? super Value,? extends ToValue> function);
     method public <ToValue> androidx.paging.DataSource<Key,ToValue> mapByPage(androidx.arch.core.util.Function<java.util.List<Value>,java.util.List<ToValue>> function);
@@ -42,6 +43,13 @@
     method @AnyThread public void onInvalidated();
   }
 
+  public static final class DataSource.Params<K> {
+    method public int getInitialLoadSize();
+    method public K? getKey();
+    method public int getPageSize();
+    method public boolean getPlaceholdersEnabled();
+  }
+
   public final class DataSourceKt {
     ctor public DataSourceKt();
   }
@@ -49,6 +57,7 @@
   public abstract class ItemKeyedDataSource<Key, Value> extends androidx.paging.DataSource<Key,Value> {
     ctor public ItemKeyedDataSource();
     method public abstract Key getKey(Value item);
+    method public final suspend Object load$lintWithKotlin(androidx.paging.DataSource.Params<Key> params, kotlin.coroutines.Continuation<? super androidx.paging.DataSource.BaseResult<Value>> p);
     method public abstract void loadAfter(androidx.paging.ItemKeyedDataSource.LoadParams<Key> params, androidx.paging.ItemKeyedDataSource.LoadCallback<Value> callback);
     method public abstract void loadBefore(androidx.paging.ItemKeyedDataSource.LoadParams<Key> params, androidx.paging.ItemKeyedDataSource.LoadCallback<Value> callback);
     method public abstract void loadInitial(androidx.paging.ItemKeyedDataSource.LoadInitialParams<Key> params, androidx.paging.ItemKeyedDataSource.LoadInitialCallback<Value> callback);
@@ -84,6 +93,7 @@
 
   public abstract class PageKeyedDataSource<Key, Value> extends androidx.paging.DataSource<Key,Value> {
     ctor public PageKeyedDataSource();
+    method public final suspend Object load$lintWithKotlin(androidx.paging.DataSource.Params<Key> params, kotlin.coroutines.Continuation<? super androidx.paging.DataSource.BaseResult<Value>> p);
     method public abstract void loadAfter(androidx.paging.PageKeyedDataSource.LoadParams<Key> params, androidx.paging.PageKeyedDataSource.LoadCallback<Key,Value> callback);
     method public abstract void loadBefore(androidx.paging.PageKeyedDataSource.LoadParams<Key> params, androidx.paging.PageKeyedDataSource.LoadCallback<Key,Value> callback);
     method public abstract void loadInitial(androidx.paging.PageKeyedDataSource.LoadInitialParams<Key> params, androidx.paging.PageKeyedDataSource.LoadInitialCallback<Key,Value> callback);
@@ -293,6 +303,7 @@
     ctor public PositionalDataSource();
     method public static final int computeInitialLoadPosition(androidx.paging.PositionalDataSource.LoadInitialParams params, int totalCount);
     method public static final int computeInitialLoadSize(androidx.paging.PositionalDataSource.LoadInitialParams params, int initialLoadPosition, int totalCount);
+    method public final suspend Object load$lintWithKotlin(androidx.paging.DataSource.Params<java.lang.Integer> params, kotlin.coroutines.Continuation<? super androidx.paging.DataSource.BaseResult<T>> p);
     method @WorkerThread public abstract void loadInitial(androidx.paging.PositionalDataSource.LoadInitialParams params, androidx.paging.PositionalDataSource.LoadInitialCallback<T> callback);
     method @WorkerThread public abstract void loadRange(androidx.paging.PositionalDataSource.LoadRangeParams params, androidx.paging.PositionalDataSource.LoadRangeCallback<T> callback);
     method public final <V> androidx.paging.PositionalDataSource<V> map(androidx.arch.core.util.Function<T,V> function);
diff --git a/paging/common/api/current.txt b/paging/common/api/current.txt
index 115c938..6738de4 100644
--- a/paging/common/api/current.txt
+++ b/paging/common/api/current.txt
@@ -8,6 +8,7 @@
     method @AnyThread public void invalidate();
     method @WorkerThread public boolean isInvalid();
     method public boolean isRetryableError(Throwable error);
+    method public abstract suspend Object load$lintWithKotlin(androidx.paging.DataSource.Params<Key> params, kotlin.coroutines.Continuation<? super androidx.paging.DataSource.BaseResult<Value>> p);
     method public <ToValue> androidx.paging.DataSource<Key,ToValue> map(androidx.arch.core.util.Function<Value,ToValue> function);
     method public <ToValue> androidx.paging.DataSource<Key,ToValue> map(kotlin.jvm.functions.Function1<? super Value,? extends ToValue> function);
     method public <ToValue> androidx.paging.DataSource<Key,ToValue> mapByPage(androidx.arch.core.util.Function<java.util.List<Value>,java.util.List<ToValue>> function);
@@ -42,6 +43,13 @@
     method @AnyThread public void onInvalidated();
   }
 
+  public static final class DataSource.Params<K> {
+    method public int getInitialLoadSize();
+    method public K? getKey();
+    method public int getPageSize();
+    method public boolean getPlaceholdersEnabled();
+  }
+
   public final class DataSourceKt {
     ctor public DataSourceKt();
   }
@@ -49,6 +57,7 @@
   public abstract class ItemKeyedDataSource<Key, Value> extends androidx.paging.DataSource<Key,Value> {
     ctor public ItemKeyedDataSource();
     method public abstract Key getKey(Value item);
+    method public final suspend Object load$lintWithKotlin(androidx.paging.DataSource.Params<Key> params, kotlin.coroutines.Continuation<? super androidx.paging.DataSource.BaseResult<Value>> p);
     method public abstract void loadAfter(androidx.paging.ItemKeyedDataSource.LoadParams<Key> params, androidx.paging.ItemKeyedDataSource.LoadCallback<Value> callback);
     method public abstract void loadBefore(androidx.paging.ItemKeyedDataSource.LoadParams<Key> params, androidx.paging.ItemKeyedDataSource.LoadCallback<Value> callback);
     method public abstract void loadInitial(androidx.paging.ItemKeyedDataSource.LoadInitialParams<Key> params, androidx.paging.ItemKeyedDataSource.LoadInitialCallback<Value> callback);
@@ -84,6 +93,7 @@
 
   public abstract class PageKeyedDataSource<Key, Value> extends androidx.paging.DataSource<Key,Value> {
     ctor public PageKeyedDataSource();
+    method public final suspend Object load$lintWithKotlin(androidx.paging.DataSource.Params<Key> params, kotlin.coroutines.Continuation<? super androidx.paging.DataSource.BaseResult<Value>> p);
     method public abstract void loadAfter(androidx.paging.PageKeyedDataSource.LoadParams<Key> params, androidx.paging.PageKeyedDataSource.LoadCallback<Key,Value> callback);
     method public abstract void loadBefore(androidx.paging.PageKeyedDataSource.LoadParams<Key> params, androidx.paging.PageKeyedDataSource.LoadCallback<Key,Value> callback);
     method public abstract void loadInitial(androidx.paging.PageKeyedDataSource.LoadInitialParams<Key> params, androidx.paging.PageKeyedDataSource.LoadInitialCallback<Key,Value> callback);
@@ -293,6 +303,7 @@
     ctor public PositionalDataSource();
     method public static final int computeInitialLoadPosition(androidx.paging.PositionalDataSource.LoadInitialParams params, int totalCount);
     method public static final int computeInitialLoadSize(androidx.paging.PositionalDataSource.LoadInitialParams params, int initialLoadPosition, int totalCount);
+    method public final suspend Object load$lintWithKotlin(androidx.paging.DataSource.Params<java.lang.Integer> params, kotlin.coroutines.Continuation<? super androidx.paging.DataSource.BaseResult<T>> p);
     method @WorkerThread public abstract void loadInitial(androidx.paging.PositionalDataSource.LoadInitialParams params, androidx.paging.PositionalDataSource.LoadInitialCallback<T> callback);
     method @WorkerThread public abstract void loadRange(androidx.paging.PositionalDataSource.LoadRangeParams params, androidx.paging.PositionalDataSource.LoadRangeCallback<T> callback);
     method public final <V> androidx.paging.PositionalDataSource<V> map(androidx.arch.core.util.Function<T,V> function);
diff --git a/paging/common/api/restricted_2.2.0-alpha01.txt b/paging/common/api/restricted_2.2.0-alpha01.txt
index 1e90286..559c9d7 100644
--- a/paging/common/api/restricted_2.2.0-alpha01.txt
+++ b/paging/common/api/restricted_2.2.0-alpha01.txt
@@ -8,7 +8,6 @@
 
 
 
-
   public abstract class PagedList<T> extends java.util.AbstractList<T> {
     method @RestrictTo(androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP) public static final <K, T> com.google.common.util.concurrent.ListenableFuture<androidx.paging.PagedList<T>> create(androidx.paging.DataSource<K,T> dataSource, kotlinx.coroutines.CoroutineScope coroutineScope, java.util.concurrent.Executor notifyExecutor, java.util.concurrent.Executor fetchExecutor, java.util.concurrent.Executor initialLoadExecutor, androidx.paging.PagedList.BoundaryCallback<T>? boundaryCallback, androidx.paging.PagedList.Config config, K? key);
     method @RestrictTo(androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP) public final androidx.paging.PagedStorage<T> getStorage();
diff --git a/paging/common/api/restricted_current.txt b/paging/common/api/restricted_current.txt
index 1e90286..559c9d7 100644
--- a/paging/common/api/restricted_current.txt
+++ b/paging/common/api/restricted_current.txt
@@ -8,7 +8,6 @@
 
 
 
-
   public abstract class PagedList<T> extends java.util.AbstractList<T> {
     method @RestrictTo(androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP) public static final <K, T> com.google.common.util.concurrent.ListenableFuture<androidx.paging.PagedList<T>> create(androidx.paging.DataSource<K,T> dataSource, kotlinx.coroutines.CoroutineScope coroutineScope, java.util.concurrent.Executor notifyExecutor, java.util.concurrent.Executor fetchExecutor, java.util.concurrent.Executor initialLoadExecutor, androidx.paging.PagedList.BoundaryCallback<T>? boundaryCallback, androidx.paging.PagedList.Config config, K? key);
     method @RestrictTo(androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP) public final androidx.paging.PagedStorage<T> getStorage();
diff --git a/paging/common/build.gradle b/paging/common/build.gradle
index ee2e5cb..13d043d 100644
--- a/paging/common/build.gradle
+++ b/paging/common/build.gradle
@@ -14,12 +14,14 @@
  * limitations under the License.
  */
 
-import static androidx.build.dependencies.DependenciesKt.*
+
+import androidx.build.AndroidXExtension
 import androidx.build.LibraryGroups
 import androidx.build.LibraryVersions
-import androidx.build.AndroidXExtension;
 import androidx.build.Publish
 
+import static androidx.build.dependencies.DependenciesKt.*
+
 plugins {
     id("AndroidXPlugin")
     id("java")
@@ -38,8 +40,9 @@
     testImplementation MOCKITO_KOTLIN, {
         exclude group: 'org.mockito' // to keep control on the mockito version
     }
-    testCompile(GUAVA)
     testImplementation project(':internal-testutils-common')
+    testImplementation(KOTLIN_TEST_COMMON)
+    testImplementation(TRUTH)
 }
 
 androidx {
diff --git a/paging/common/src/main/kotlin/androidx/paging/DataSource.kt b/paging/common/src/main/kotlin/androidx/paging/DataSource.kt
index b610179..a2fe5aa 100644
--- a/paging/common/src/main/kotlin/androidx/paging/DataSource.kt
+++ b/paging/common/src/main/kotlin/androidx/paging/DataSource.kt
@@ -21,7 +21,6 @@
 import androidx.annotation.VisibleForTesting
 import androidx.annotation.WorkerThread
 import androidx.arch.core.util.Function
-import com.google.common.util.concurrent.ListenableFuture
 import java.util.concurrent.CopyOnWriteArrayList
 import java.util.concurrent.Executor
 import java.util.concurrent.atomic.AtomicBoolean
@@ -443,16 +442,9 @@
     /**
      * @param K Type of the key used to query the [DataSource].
      * @property key Can be `null` for init, otherwise non-null
-     *
-     * @hide
      */
-    @RestrictTo(RestrictTo.Scope.LIBRARY)
     class Params<K : Any> internal constructor(
-        /**
-         * @hide
-         */
-        @RestrictTo(RestrictTo.Scope.LIBRARY)
-        val type: LoadType,
+        internal val type: LoadType,
         val key: K?,
         val initialLoadSize: Int,
         val placeholdersEnabled: Boolean,
@@ -575,7 +567,7 @@
         ITEM_KEYED
     }
 
-    internal abstract fun load(params: Params<Key>): ListenableFuture<out BaseResult<Value>>
+    internal abstract suspend fun load(params: Params<Key>): BaseResult<Value>
 
     internal abstract fun getKeyInternal(item: Value): Key
 
diff --git a/paging/common/src/main/kotlin/androidx/paging/ItemKeyedDataSource.kt b/paging/common/src/main/kotlin/androidx/paging/ItemKeyedDataSource.kt
index e4a8656..53942b3 100644
--- a/paging/common/src/main/kotlin/androidx/paging/ItemKeyedDataSource.kt
+++ b/paging/common/src/main/kotlin/androidx/paging/ItemKeyedDataSource.kt
@@ -20,10 +20,11 @@
 import androidx.arch.core.util.Function
 import androidx.concurrent.futures.ResolvableFuture
 import androidx.paging.DataSource.KeyType.ITEM_KEYED
+import androidx.paging.futures.await
 import com.google.common.util.concurrent.ListenableFuture
 
 /**
- * Incremental data loader for paging keyed content, where loaded content uses previously loaded
+* Incremental data loader for paging keyed content, where loaded content uses previously loaded
  * items as input to future loads.
  *
  * Implement a DataSource using ItemKeyedDataSource if you need to use data from item `N - 1`
@@ -193,7 +194,7 @@
     }
 
     @Suppress("RedundantVisibilityModifier") // Metalava doesn't inherit visibility properly.
-    internal final override fun load(params: Params<Key>): ListenableFuture<out BaseResult<Value>> {
+    internal final override suspend fun load(params: Params<Key>): BaseResult<Value> {
         when (params.type) {
             LoadType.INITIAL -> {
                 val initParams = LoadInitialParams(
@@ -201,15 +202,15 @@
                     params.initialLoadSize,
                     params.placeholdersEnabled
                 )
-                return loadInitial(initParams)
+                return loadInitial(initParams).await()
             }
             LoadType.START -> {
                 val loadParams = LoadParams(params.key!!, params.pageSize)
-                return loadBefore(loadParams)
+                return loadBefore(loadParams).await()
             }
             LoadType.END -> {
                 val loadParams = LoadParams(params.key!!, params.pageSize)
-                return loadAfter(loadParams)
+                return loadAfter(loadParams).await()
             }
         }
     }
@@ -233,14 +234,19 @@
                     future.setException(error)
                 }
             }
-            loadInitial(
-                LoadInitialParams(
-                    params.requestedInitialKey,
-                    params.requestedLoadSize,
-                    params.placeholdersEnabled
-                ),
-                callback
-            )
+
+            try {
+                loadInitial(
+                    LoadInitialParams(
+                        params.requestedInitialKey,
+                        params.requestedLoadSize,
+                        params.placeholdersEnabled
+                    ),
+                    callback
+                )
+            } catch (e: Exception) {
+                future.setException(e)
+            }
         }
         return future
     }
diff --git a/paging/common/src/main/kotlin/androidx/paging/PageKeyedDataSource.kt b/paging/common/src/main/kotlin/androidx/paging/PageKeyedDataSource.kt
index db7d8dd..d481c59 100644
--- a/paging/common/src/main/kotlin/androidx/paging/PageKeyedDataSource.kt
+++ b/paging/common/src/main/kotlin/androidx/paging/PageKeyedDataSource.kt
@@ -19,6 +19,7 @@
 import androidx.arch.core.util.Function
 import androidx.concurrent.futures.ResolvableFuture
 import androidx.paging.DataSource.KeyType.PAGE_KEYED
+import androidx.paging.futures.await
 import com.google.common.util.concurrent.ListenableFuture
 
 /**
@@ -234,28 +235,25 @@
         }
     }
 
+    /**
+     * @throws [IllegalArgumentException] when passed an unsupported load type.
+     */
     @Suppress("RedundantVisibilityModifier") // Metalava doesn't inherit visibility properly.
-    internal final override fun load(params: Params<Key>): ListenableFuture<out BaseResult<Value>> {
+    internal final override suspend fun load(params: Params<Key>): BaseResult<Value> {
         if (params.type == LoadType.INITIAL) {
             val initParams = LoadInitialParams<Key>(
                 params.initialLoadSize,
                 params.placeholdersEnabled
             )
-            return loadInitial(initParams)
+            return loadInitial(initParams).await()
         } else {
-            if (params.key == null) {
-                // null key, immediately return empty data
-                val future = ResolvableFuture.create<BaseResult<Value>>()
-                future.set(BaseResult.empty())
-                return future
-            }
+            // null key, return empty data
+            if (params.key == null) return BaseResult.empty()
 
             val loadParams = LoadParams(params.key, params.pageSize)
-
-            if (params.type == LoadType.START) {
-                return loadBefore(loadParams)
-            } else if (params.type == LoadType.END) {
-                return loadAfter(loadParams)
+            when {
+                params.type == LoadType.START -> return loadBefore(loadParams).await()
+                params.type == LoadType.END -> return loadAfter(loadParams).await()
             }
         }
         throw IllegalArgumentException("Unsupported type " + params.type.toString())
diff --git a/paging/common/src/main/kotlin/androidx/paging/PagedList.kt b/paging/common/src/main/kotlin/androidx/paging/PagedList.kt
index 884ca15..4a74c79 100644
--- a/paging/common/src/main/kotlin/androidx/paging/PagedList.kt
+++ b/paging/common/src/main/kotlin/androidx/paging/PagedList.kt
@@ -30,10 +30,12 @@
 import androidx.paging.PagedList.LoadState
 import androidx.paging.PagedList.LoadType
 import androidx.paging.futures.DirectExecutor
+import androidx.paging.futures.future
 import androidx.paging.futures.transform
 import com.google.common.util.concurrent.ListenableFuture
 import kotlinx.coroutines.CoroutineScope
 import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.asCoroutineDispatcher
 import java.lang.ref.WeakReference
 import java.util.AbstractList
 import java.util.ArrayList
@@ -199,7 +201,11 @@
                 config.pageSize
             )
 
-            return dataSource.load(params).transform(
+            val future = coroutineScope.future(initialLoadExecutor.asCoroutineDispatcher()) {
+                dataSource.load(params)
+            }
+
+            return future.transform(
                 Function { initialResult ->
                     dataSource.initExecutor(fetchExecutor)
                     ContiguousPagedList(
diff --git a/paging/common/src/main/kotlin/androidx/paging/PagedSourceWrapper.kt b/paging/common/src/main/kotlin/androidx/paging/PagedSourceWrapper.kt
index eb5fb83..a9c64fc 100644
--- a/paging/common/src/main/kotlin/androidx/paging/PagedSourceWrapper.kt
+++ b/paging/common/src/main/kotlin/androidx/paging/PagedSourceWrapper.kt
@@ -16,8 +16,6 @@
 
 package androidx.paging
 
-import androidx.paging.futures.await
-
 /**
  * A wrapper around [DataSource] which adapts it to the [PagedSource] API.
  */
@@ -55,7 +53,7 @@
             params.pageSize
         )
 
-        return dataSource.load(dataSourceParams).await().toLoadResult()
+        return dataSource.load(dataSourceParams).toLoadResult()
     }
 
     override fun isRetryableError(error: Throwable) = dataSource.isRetryableError(error)
diff --git a/paging/common/src/main/kotlin/androidx/paging/PositionalDataSource.kt b/paging/common/src/main/kotlin/androidx/paging/PositionalDataSource.kt
index 45eb559..f6df55e 100644
--- a/paging/common/src/main/kotlin/androidx/paging/PositionalDataSource.kt
+++ b/paging/common/src/main/kotlin/androidx/paging/PositionalDataSource.kt
@@ -23,6 +23,7 @@
 import androidx.concurrent.futures.ResolvableFuture
 import androidx.paging.DataSource.KeyType.POSITIONAL
 import androidx.paging.PositionalDataSource.LoadInitialCallback
+import androidx.paging.futures.await
 import com.google.common.util.concurrent.ListenableFuture
 
 /**
@@ -350,7 +351,7 @@
     }
 
     @Suppress("RedundantVisibilityModifier") // Metalava doesn't inherit visibility properly.
-    internal final override fun load(params: Params<Int>): ListenableFuture<out BaseResult<T>> {
+    internal final override suspend fun load(params: Params<Int>): BaseResult<T> {
         if (params.type == LoadType.INITIAL) {
             var initialPosition = 0
             var initialLoadSize = params.initialLoadSize
@@ -376,7 +377,7 @@
                 params.pageSize,
                 params.placeholdersEnabled
             )
-            return loadInitial(initParams)
+            return loadInitial(initParams).await()
         } else {
             var startIndex = params.key!!
             var loadSize = params.pageSize
@@ -384,7 +385,7 @@
                 loadSize = minOf(loadSize, startIndex + 1)
                 startIndex = startIndex - loadSize + 1
             }
-            return loadRange(LoadRangeParams(startIndex, loadSize))
+            return loadRange(LoadRangeParams(startIndex, loadSize)).await()
         }
     }
 
diff --git a/paging/common/src/main/kotlin/androidx/paging/WrapperDataSource.kt b/paging/common/src/main/kotlin/androidx/paging/WrapperDataSource.kt
index 696285b..f7fd3c2 100644
--- a/paging/common/src/main/kotlin/androidx/paging/WrapperDataSource.kt
+++ b/paging/common/src/main/kotlin/androidx/paging/WrapperDataSource.kt
@@ -17,9 +17,6 @@
 package androidx.paging
 
 import androidx.arch.core.util.Function
-import androidx.paging.futures.DirectExecutor
-import androidx.paging.futures.transform
-import com.google.common.util.concurrent.ListenableFuture
 import java.util.IdentityHashMap
 
 /**
@@ -67,13 +64,10 @@
         }
     }
 
-    override fun load(params: Params<Key>): ListenableFuture<out BaseResult<ValueTo>> =
-        source.load(params).transform(
-            Function { input ->
-                val result = BaseResult.convert(input, listFunction)
-                stashKeysIfNeeded(input.data, result.data)
-                result
-            },
-            DirectExecutor
-        )
+    override suspend fun load(params: Params<Key>): BaseResult<ValueTo> {
+        val input = source.load(params)
+        val result = BaseResult.convert(input, listFunction)
+        stashKeysIfNeeded(input.data, result.data)
+        return result
+    }
 }
diff --git a/paging/common/src/main/kotlin/androidx/paging/futures/Futures.kt b/paging/common/src/main/kotlin/androidx/paging/futures/Futures.kt
index e53bdd4..01c74a6 100644
--- a/paging/common/src/main/kotlin/androidx/paging/futures/Futures.kt
+++ b/paging/common/src/main/kotlin/androidx/paging/futures/Futures.kt
@@ -22,15 +22,18 @@
 import androidx.annotation.RestrictTo
 import androidx.arch.core.util.Function
 import androidx.concurrent.futures.ResolvableFuture
-
 import com.google.common.util.concurrent.ListenableFuture
 import kotlinx.coroutines.CancellableContinuation
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.launch
 import kotlinx.coroutines.suspendCancellableCoroutine
 
 import java.util.concurrent.CancellationException
 import java.util.concurrent.ExecutionException
 import java.util.concurrent.Executor
 import kotlin.coroutines.Continuation
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.EmptyCoroutineContext
 import kotlin.coroutines.resume
 import kotlin.coroutines.resumeWithException
 
@@ -195,3 +198,29 @@
         cont?.resumeWithException(throwable)
     }
 }
+
+/**
+ * Launches a new coroutine with optionally provided [coroutineContext] and returns its result as an
+ * implementation of [ResolvableFuture].
+ *
+ * @param coroutineContext additional to [CoroutineScope.coroutineContext] context of the coroutine.
+ * @param block the coroutine code
+ *
+ * @hide
+ */
+@RestrictTo(RestrictTo.Scope.LIBRARY) // Redundant to hide from Metalava b/135947782.
+internal fun <T> CoroutineScope.future(
+    coroutineContext: CoroutineContext = EmptyCoroutineContext,
+    block: suspend CoroutineScope.() -> T
+): ListenableFuture<T> {
+    val future = ResolvableFuture.create<T>()
+    launch(coroutineContext) {
+        try {
+            future.set(block())
+        } catch (e: Throwable) {
+            future.setException(e)
+        }
+    }
+
+    return future
+}
\ No newline at end of file
diff --git a/paging/common/src/test/kotlin/androidx/paging/ItemKeyedDataSourceTest.kt b/paging/common/src/test/kotlin/androidx/paging/ItemKeyedDataSourceTest.kt
index a6ccc5d..445d687 100644
--- a/paging/common/src/test/kotlin/androidx/paging/ItemKeyedDataSourceTest.kt
+++ b/paging/common/src/test/kotlin/androidx/paging/ItemKeyedDataSourceTest.kt
@@ -17,6 +17,7 @@
 package androidx.paging
 
 import androidx.paging.futures.DirectExecutor
+import androidx.paging.futures.assertFailsWithCause
 import com.nhaarman.mockitokotlin2.capture
 import com.nhaarman.mockitokotlin2.mock
 import kotlinx.coroutines.GlobalScope
@@ -337,28 +338,44 @@
         it.onResult(elevenLetterList, 0, 12)
     }
 
-    @Test(expected = IllegalArgumentException::class)
-    fun loadInitialCallbackListTooBig() = performLoadInitial {
-        // LoadInitialCallback can't accept pos + list > totalCount
-        it.onResult(listOf("a", "b", "c"), 0, 2)
+    @Test
+    fun loadInitialCallbackListTooBig() {
+        assertFailsWithCause<IllegalArgumentException> {
+            performLoadInitial {
+                // LoadInitialCallback can't accept pos + list > totalCount
+                it.onResult(listOf("a", "b", "c"), 0, 2)
+            }
+        }
     }
 
-    @Test(expected = IllegalArgumentException::class)
-    fun loadInitialCallbackPositionTooLarge() = performLoadInitial {
-        // LoadInitialCallback can't accept pos + list > totalCount
-        it.onResult(listOf("a", "b"), 1, 2)
+    @Test
+    fun loadInitialCallbackPositionTooLarge() {
+        assertFailsWithCause<IllegalArgumentException> {
+            performLoadInitial {
+                // LoadInitialCallback can't accept pos + list > totalCount
+                it.onResult(listOf("a", "b"), 1, 2)
+            }
+        }
     }
 
-    @Test(expected = IllegalArgumentException::class)
-    fun loadInitialCallbackPositionNegative() = performLoadInitial {
-        // LoadInitialCallback can't accept negative position
-        it.onResult(listOf("a", "b", "c"), -1, 2)
+    @Test
+    fun loadInitialCallbackPositionNegative() {
+        assertFailsWithCause<IllegalArgumentException> {
+            performLoadInitial {
+                // LoadInitialCallback can't accept negative position
+                it.onResult(listOf("a", "b", "c"), -1, 2)
+            }
+        }
     }
 
-    @Test(expected = IllegalArgumentException::class)
-    fun loadInitialCallbackEmptyCannotHavePlaceholders() = performLoadInitial {
-        // LoadInitialCallback can't accept empty result unless data set is empty
-        it.onResult(emptyList(), 0, 2)
+    @Test
+    fun loadInitialCallbackEmptyCannotHavePlaceholders() {
+        assertFailsWithCause<IllegalArgumentException> {
+            performLoadInitial {
+                // LoadInitialCallback can't accept empty result unless data set is empty
+                it.onResult(emptyList(), 0, 2)
+            }
+        }
     }
 
     private abstract class WrapperDataSource<K : Any, A : Any, B : Any>(
diff --git a/paging/common/src/test/kotlin/androidx/paging/PageKeyedDataSourceTest.kt b/paging/common/src/test/kotlin/androidx/paging/PageKeyedDataSourceTest.kt
index 86a0bf5..2f86f95 100644
--- a/paging/common/src/test/kotlin/androidx/paging/PageKeyedDataSourceTest.kt
+++ b/paging/common/src/test/kotlin/androidx/paging/PageKeyedDataSourceTest.kt
@@ -17,6 +17,7 @@
 package androidx.paging
 
 import androidx.paging.futures.DirectExecutor
+import androidx.paging.futures.assertFailsWithCause
 import androidx.testutils.TestExecutor
 import kotlinx.coroutines.GlobalScope
 import org.junit.Assert.assertEquals
@@ -176,28 +177,44 @@
         it.onResult(elevenLetterList, 0, 12, null, null)
     }
 
-    @Test(expected = IllegalArgumentException::class)
-    fun loadInitialCallbackListTooBig() = performLoadInitial {
-        // LoadInitialCallback can't accept pos + list > totalCount
-        it.onResult(listOf("a", "b", "c"), 0, 2, null, null)
+    @Test
+    fun loadInitialCallbackListTooBig() {
+        assertFailsWithCause<IllegalArgumentException> {
+            performLoadInitial {
+                // LoadInitialCallback can't accept pos + list > totalCount
+                it.onResult(listOf("a", "b", "c"), 0, 2, null, null)
+            }
+        }
     }
 
-    @Test(expected = IllegalArgumentException::class)
-    fun loadInitialCallbackPositionTooLarge() = performLoadInitial {
-        // LoadInitialCallback can't accept pos + list > totalCount
-        it.onResult(listOf("a", "b"), 1, 2, null, null)
+    @Test
+    fun loadInitialCallbackPositionTooLarge() {
+        assertFailsWithCause<IllegalArgumentException> {
+            performLoadInitial {
+                // LoadInitialCallback can't accept pos + list > totalCount
+                it.onResult(listOf("a", "b"), 1, 2, null, null)
+            }
+        }
     }
 
-    @Test(expected = IllegalArgumentException::class)
-    fun loadInitialCallbackPositionNegative() = performLoadInitial {
-        // LoadInitialCallback can't accept negative position
-        it.onResult(listOf("a", "b", "c"), -1, 2, null, null)
+    @Test
+    fun loadInitialCallbackPositionNegative() {
+        assertFailsWithCause<IllegalArgumentException> {
+            performLoadInitial {
+                // LoadInitialCallback can't accept negative position
+                it.onResult(listOf("a", "b", "c"), -1, 2, null, null)
+            }
+        }
     }
 
-    @Test(expected = IllegalArgumentException::class)
-    fun loadInitialCallbackEmptyCannotHavePlaceholders() = performLoadInitial {
-        // LoadInitialCallback can't accept empty result unless data set is empty
-        it.onResult(emptyList(), 0, 2, null, null)
+    @Test
+    fun loadInitialCallbackEmptyCannotHavePlaceholders() {
+        assertFailsWithCause<IllegalArgumentException> {
+            performLoadInitial {
+                // LoadInitialCallback can't accept empty result unless data set is empty
+                it.onResult(emptyList(), 0, 2, null, null)
+            }
+        }
     }
 
     @Test
diff --git a/paging/common/src/test/kotlin/androidx/paging/futures/Futures.kt b/paging/common/src/test/kotlin/androidx/paging/futures/Futures.kt
new file mode 100644
index 0000000..873a3ef
--- /dev/null
+++ b/paging/common/src/test/kotlin/androidx/paging/futures/Futures.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.paging.futures
+
+import com.google.common.truth.ThrowableSubject
+import com.google.common.truth.Truth.assertThat
+import java.util.concurrent.ExecutionException
+import kotlin.test.assertFails
+
+/**
+ * Helper to unwrap the cause of an expected exception and returns a [ThrowableSubject] of the cause
+ * to allow further assertions.
+ *
+ * E.g., [ExecutionException.cause] thrown from
+ * [androidx.concurrent.futures.AbstractResolvableFuture.setException]
+ */
+internal inline fun <reified T : Throwable> assertFailsWithCause(
+    body: () -> Unit
+): ThrowableSubject {
+    val exception = assertFails { body() }
+    val throwableSubject = assertThat(exception.cause)
+    throwableSubject.isInstanceOf(T::class.java)
+    return throwableSubject
+}