blob: 01c74a68ee52b21fa81ceadfd235a10541b5658f [file] [log] [blame]
/*
* Copyright 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// TODO: Remove once paging-runtime is converted to .kt.
@file:JvmName("Futures")
package androidx.paging.futures
import androidx.annotation.RestrictTo
import androidx.arch.core.util.Function
import androidx.concurrent.futures.ResolvableFuture
import com.google.common.util.concurrent.ListenableFuture
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.concurrent.CancellationException
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executor
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
/**
* Registers separate success and failure callbacks to be run when the `Future`'s computation is
* complete or, if the computation is already complete, immediately.
*
* The callback is run on `executor`. There is no guaranteed ordering of execution of callbacks,
* but any callback added through this method is guaranteed to be called once the computation is
* complete.
*
* Example:
* ```
* ListenableFuture<QueryResult> future = ...;
* Executor e = ...
* addCallback(future, new FutureCallback<QueryResult>() {
* public void onSuccess(QueryResult result) {
* storeInCache(result);
* }
* public void onFailure(Throwable t) {
* reportError(t);
* }
* }, e);
* ```
*
* When selecting an executor, note that `directExecutor` is dangerous in some cases. See the
* discussion in the [ListenableFuture.addListener] documentation. All its warnings about
* heavyweight listeners are also applicable to heavyweight callbacks passed to this method.
*
* For a more general interface to attach a completion listener to a `Future`, see
* [ListenableFuture.addListener].
*
* @param callback The callback to invoke when `future` is completed.
* @param executor The executor to run `callback` when the future completes.
*
* @hide
*/
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
fun <V> ListenableFuture<out V>.addCallback(callback: FutureCallback<in V>, executor: Executor) {
addListener(Runnable {
val value: V
try {
value = get()
} catch (e: ExecutionException) {
callback.onError(e.cause ?: e)
return@Runnable
} catch (e: Throwable) {
callback.onError(e)
return@Runnable
}
callback.onSuccess(value)
}, executor)
}
/**
* Returns a new `Future` whose result is derived from the result of the given `Future`. If
* `input` fails, the returned `Future` fails with the same exception (and the function is not
* invoked). Example usage:
*
* ```
* ListenableFuture<QueryResult> queryFuture = ...;
* ListenableFuture<List<Row>> rowsFuture =
* transform(queryFuture, QueryResult::getRows, executor);
* ```
*
* When selecting an executor, note that `directExecutor` is dangerous in some cases. See the
* discussion in the [ListenableFuture.addListener] documentation. All its warnings about
* heavyweight listeners are also applicable to heavyweight functions passed to this method.
*
* The returned `Future` attempts to keep its cancellation state in sync with that of the input
* future. That is, if the returned `Future` is cancelled, it will attempt to cancel the input,
* and if the input is cancelled, the returned `Future` will receive a callback in which it will
* attempt to cancel itself.
*
* An example use of this method is to convert a serializable object returned from an RPC into a
* POJO.
*
* @param function A Function to transform the results of the provided future to the results of
* the returned future.
* @param executor Executor to run the function in.
* @return A future that holds result of the transformation.
*
* @hide
*/
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
fun <I, O> ListenableFuture<out I>.transform(
function: Function<in I, out O>,
executor: Executor
) = transform({ function.apply(it) }, executor)
internal inline fun <I, O> ListenableFuture<out I>.transform(
crossinline function: (input: I) -> O,
executor: Executor
): ListenableFuture<O> {
val out = ResolvableFuture.create<O>()
// Add success/error callback.
addCallback(object : FutureCallback<I> {
override fun onSuccess(value: I) {
out.set(function(value))
}
override fun onError(throwable: Throwable) {
out.setException(throwable)
}
}, executor)
// Propagate output future's cancellation to input future.
out.addCallback(object : FutureCallback<O> {
override fun onSuccess(value: O) {}
override fun onError(throwable: Throwable) {
if (throwable is CancellationException) {
cancel(false)
}
}
}, executor)
return out
}
/**
* Awaits for completion of the future without blocking a thread.
*
* This suspending function is cancellable.
*
* If the `Job` of the current coroutine is cancelled or completed while this suspending function is
* waiting, this function stops waiting for the future and immediately resumes with
* [CancellationException][kotlinx.coroutines.CancellationException].
*
* This method is intended to be used with one-shot futures, so on coroutine cancellation future is
* cancelled as well. If cancelling given future is undesired, `future.asDeferred().await()` should
* be used instead.
*
* @hide
*/
@RestrictTo(RestrictTo.Scope.LIBRARY) // Redundant to hide from Metalava b/135947782.
internal suspend fun <T> ListenableFuture<T>.await(): T {
try {
if (isDone) return get() as T
} catch (e: ExecutionException) {
throw e.cause ?: e // unwrap original cause from ExecutionException
}
return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
val callback = ContinuationCallback(cont)
this.addCallback(callback, DirectExecutor)
cont.invokeOnCancellation {
cancel(false)
callback.cont = null // clear the reference to continuation from the future's callback
}
}
}
private class ContinuationCallback<T>(@Volatile @JvmField var cont: Continuation<T>?) :
FutureCallback<T> {
override fun onSuccess(value: T) {
cont?.resume(value)
}
override fun onError(throwable: Throwable) {
cont?.resumeWithException(throwable)
}
}
/**
* Launches a new coroutine with optionally provided [coroutineContext] and returns its result as an
* implementation of [ResolvableFuture].
*
* @param coroutineContext additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param block the coroutine code
*
* @hide
*/
@RestrictTo(RestrictTo.Scope.LIBRARY) // Redundant to hide from Metalava b/135947782.
internal fun <T> CoroutineScope.future(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T
): ListenableFuture<T> {
val future = ResolvableFuture.create<T>()
launch(coroutineContext) {
try {
future.set(block())
} catch (e: Throwable) {
future.setException(e)
}
}
return future
}