| /* |
| * Copyright 2020 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package androidx.paging |
| |
| import kotlinx.coroutines.CoroutineScope |
| import kotlinx.coroutines.Job |
| import kotlinx.coroutines.channels.Channel |
| import kotlinx.coroutines.channels.SendChannel |
| import kotlinx.coroutines.coroutineScope |
| import kotlinx.coroutines.flow.Flow |
| import kotlinx.coroutines.flow.buffer |
| import kotlinx.coroutines.flow.flow |
| import kotlinx.coroutines.flow.internal.FusibleFlow |
| import kotlinx.coroutines.launch |
| import kotlinx.coroutines.suspendCancellableCoroutine |
| import kotlin.coroutines.resume |
| |
| /** |
| * This is a simplified channelFlow implementation as a temporary measure until channel flow |
| * leaves experimental state. |
| * |
| * The exact same implementation is not possible due to [FusibleFlow] being an internal API. To |
| * get close to that implementation, internally we use a [Channel.RENDEZVOUS] channel and use a |
| * [buffer] ([Channel.BUFFERED]) operator on the resulting Flow. This gives us a close behavior |
| * where the default is buffered and any followup buffer operation will result in +1 value being |
| * produced. |
| */ |
| internal fun <T> simpleChannelFlow( |
| block: suspend SimpleProducerScope<T>.() -> Unit |
| ): Flow<T> { |
| return flow { |
| coroutineScope { |
| val channel = Channel<T>(capacity = Channel.RENDEZVOUS) |
| val producer = launch { |
| try { |
| // run producer in a separate inner scope to ensure we wait for its children |
| // to finish, in case it does more launches inside. |
| coroutineScope { |
| val producerScopeImpl = SimpleProducerScopeImpl( |
| scope = this, |
| channel = channel, |
| ) |
| producerScopeImpl.block() |
| } |
| channel.close() |
| } catch (t: Throwable) { |
| channel.close(t) |
| } |
| } |
| for (item in channel) { |
| emit(item) |
| } |
| // in case channel closed before producer completes, cancel the producer. |
| producer.cancel() |
| } |
| }.buffer(Channel.BUFFERED) |
| } |
| |
| internal interface SimpleProducerScope<T> : CoroutineScope, SendChannel<T> { |
| val channel: SendChannel<T> |
| suspend fun awaitClose(block: () -> Unit) |
| } |
| |
| internal class SimpleProducerScopeImpl<T>( |
| scope: CoroutineScope, |
| override val channel: SendChannel<T>, |
| ) : SimpleProducerScope<T>, CoroutineScope by scope, SendChannel<T> by channel { |
| override suspend fun awaitClose(block: () -> Unit) { |
| try { |
| val job = checkNotNull(coroutineContext[Job]) { |
| "Internal error, context should have a job." |
| } |
| suspendCancellableCoroutine<Unit> { cont -> |
| job.invokeOnCompletion { |
| cont.resume(Unit) |
| } |
| } |
| } finally { |
| block() |
| } |
| } |
| } |