Merge "Use SessionManager lock to close session in SessionWorker" into androidx-main
diff --git a/glance/glance/src/main/java/androidx/glance/session/SessionWorker.kt b/glance/glance/src/main/java/androidx/glance/session/SessionWorker.kt
index 544dd2d..8928858 100644
--- a/glance/glance/src/main/java/androidx/glance/session/SessionWorker.kt
+++ b/glance/glance/src/main/java/androidx/glance/session/SessionWorker.kt
@@ -34,11 +34,13 @@
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
+import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
/**
* Options to configure [SessionWorker] timeouts.
@@ -126,7 +128,23 @@
}
)
} finally {
- session.close()
+ // Get session manager lock to close session to prevent a race where an observer
+ // who is checking session state (e.g. GlanceAppWidget.update) sees this session
+ // as running before trying to send an event. Without the lock, it may happen
+ // that we close right after they see us as running, which will cause an error
+ // when they try to send an event.
+ // With the lock, if they see us as running and send an event right before this
+ // block, then the event will be unhandled.
+ // After this block, observers will see this session as closed, so they will
+ // start a new one instead of trying to send events to this one.
+ // We must use NonCancellable here because it provides a Job that has not been
+ // cancelled. The withTimerOrNull Job that the session runs in has already been
+ // cancelled, so suspending with that Job would throw a CancellationException.
+ withContext(NonCancellable) {
+ sessionManager.runWithLock {
+ closeSession(session.key)
+ }
+ }
}
Result.success()
}
@@ -153,8 +171,7 @@
val effectExceptionHandler = CoroutineExceptionHandler { _, throwable ->
launch {
session.onCompositionError(context, throwable)
- session.close()
- uiReady.emit(true)
+ [email protected]("Error in composition effect coroutine", throwable);
}
}
val effectCoroutineContext = effectJobFactory().let { job ->
@@ -164,68 +181,69 @@
val recomposer = Recomposer(effectCoroutineContext)
val composition = Composition(Applier(root), recomposer)
- launch(frameClock) {
- try {
- composition.setContent(session.provideGlance(context))
- recomposer.runRecomposeAndApplyChanges()
- } catch (e: CancellationException) {
- // do nothing if we are cancelled.
- } catch (throwable: Throwable) {
- session.onCompositionError(context, throwable)
- session.close()
- // Set uiReady to true to resume coroutine waiting on it.
- uiReady.emit(true)
- }
- }
- launch {
- var lastRecomposeCount = recomposer.changeCount
- recomposer.currentState.collectLatest { state ->
- if (SessionWorker.DEBUG)
- Log.d(SessionWorker.TAG, "Recomposer(${session.key}): currentState=$state")
- when (state) {
- Recomposer.State.Idle -> {
- // Only update the session when a change has actually occurred. The
- // Recomposer may sometimes wake up due to changes in other
- // compositions. Also update the session if we have not sent an initial
- // tree yet.
- if (recomposer.changeCount > lastRecomposeCount || !uiReady.value) {
- if (SessionWorker.DEBUG)
- Log.d(SessionWorker.TAG, "UI tree updated (${session.key})")
- val processed = session.processEmittableTree(
- context,
- root.copy() as EmittableWithChildren
- )
- // If the UI has been processed for the first time, set uiReady to true
- // and start the timeout.
- if (!uiReady.value && processed) {
- uiReady.emit(true)
- startTimer(timeouts.initialTimeout)
- }
- }
- lastRecomposeCount = recomposer.changeCount
- }
- Recomposer.State.ShutDown -> cancel()
- else -> {}
+ try {
+ launch(frameClock) {
+ try {
+ composition.setContent(session.provideGlance(context))
+ recomposer.runRecomposeAndApplyChanges()
+ } catch (e: CancellationException) {
+ // do nothing if we are cancelled.
+ } catch (throwable: Throwable) {
+ session.onCompositionError(context, throwable)
+ [email protected]("Error in recomposition coroutine", throwable);
}
}
- }
+ launch {
+ var lastRecomposeCount = recomposer.changeCount
+ recomposer.currentState.collectLatest { state ->
+ if (SessionWorker.DEBUG)
+ Log.d(SessionWorker.TAG, "Recomposer(${session.key}): currentState=$state")
+ when (state) {
+ Recomposer.State.Idle -> {
+ // Only update the session when a change has actually occurred. The
+ // Recomposer may sometimes wake up due to changes in other
+ // compositions. Also update the session if we have not sent an initial
+ // tree yet.
+ if (recomposer.changeCount > lastRecomposeCount || !uiReady.value) {
+ if (SessionWorker.DEBUG)
+ Log.d(SessionWorker.TAG, "UI tree updated (${session.key})")
+ val processed = session.processEmittableTree(
+ context,
+ root.copy() as EmittableWithChildren
+ )
+ // If the UI has been processed for the first time, set uiReady to true
+ // and start the timeout.
+ if (!uiReady.value && processed) {
+ uiReady.emit(true)
+ startTimer(timeouts.initialTimeout)
+ }
+ }
+ lastRecomposeCount = recomposer.changeCount
+ }
+ Recomposer.State.ShutDown -> cancel()
+ else -> {}
+ }
+ }
+ }
- // Wait until the Emittable tree has been processed at least once before receiving events.
- uiReady.first { it }
- session.receiveEvents(context) {
- // If time is running low, add time to make sure that we have time to respond to this
- // event.
- if (timeLeft < timeouts.additionalTime) addTime(timeouts.additionalTime)
- if (SessionWorker.DEBUG)
- Log.d(SessionWorker.TAG, "processing event for ${session.key}; $timeLeft left")
- launch { frameClock.startInteractive() }
+ // Wait until the Emittable tree has been processed at least once before receiving events.
+ uiReady.first { it }
+ // receiveEvents will suspend until the session is closed (usually due to widget deletion)
+ // or it is cancelled (in case of composition errors or timeout).
+ session.receiveEvents(context) {
+ // If time is running low, add time to make sure that we have time to respond to this
+ // event.
+ if (timeLeft < timeouts.additionalTime) addTime(timeouts.additionalTime)
+ if (SessionWorker.DEBUG)
+ Log.d(SessionWorker.TAG, "processing event for ${session.key}; $timeLeft left")
+ launch { frameClock.startInteractive() }
+ }
+ } finally {
+ composition.dispose()
+ frameClock.stopInteractive()
+ snapshotMonitor.cancel()
+ recomposer.cancel()
}
-
- composition.dispose()
- frameClock.stopInteractive()
- snapshotMonitor.cancel()
- recomposer.close()
- recomposer.join()
}
/**
diff --git a/glance/glance/src/test/kotlin/androidx/glance/session/SessionWorkerTest.kt b/glance/glance/src/test/kotlin/androidx/glance/session/SessionWorkerTest.kt
index 693218f..6fdf2ee 100644
--- a/glance/glance/src/test/kotlin/androidx/glance/session/SessionWorkerTest.kt
+++ b/glance/glance/src/test/kotlin/androidx/glance/session/SessionWorkerTest.kt
@@ -42,7 +42,10 @@
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
+import kotlinx.coroutines.sync.Mutex
+import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.test.runTest
+import kotlinx.coroutines.yield
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
@@ -297,16 +300,54 @@
sessionManager.scope.startSession(context).first()
sessionManager.scope.closeSession()
}
+
+ @Test
+ fun sessionWorkerClosesWithTheLock() = runTest {
+ launch {
+ val result = worker.doWork()
+ assertThat(result).isEqualTo(Result.success())
+ }
+
+ val runError = mutableStateOf(false)
+ val resultFlow = sessionManager.runWithLock {
+ this as TestSessionManager.TestSessionManagerScope
+ startSession(context) {
+ Text("Hello")
+ if (runError.value) throw Throwable()
+ }
+ }
+ resultFlow.first { it.isSuccess }
+
+ // Start the error within the lock
+ sessionManager.runWithLock {
+ runError.value = true
+ resultFlow.first { it.isFailure }
+ // Composition is now cancelled due to error. However, the worker should not be able to
+ // close the session channel until it has the lock. yield() here; the worker will run
+ // until it suspends to wait for the lock.
+ yield()
+ val session = checkNotNull(getSession(SESSION_KEY))
+ assertThat(session.isOpen).isTrue()
+ }
+
+ // Now that we've let go of the lock, yield() again to make sure the worker can resume
+ // from waiting for the lock and close the session.
+ yield()
+ sessionManager.runWithLock {
+ val session = checkNotNull(getSession(SESSION_KEY))
+ assertThat(session.isOpen).isFalse()
+ }
+ }
}
private const val SESSION_KEY = "123"
class TestSessionManager : SessionManager {
val scope = TestSessionManagerScope()
- // No locking needed, tests are run on single threaded environment and is only user of this
- // SessionManager.
+ private val mutex = Mutex()
+
override suspend fun <T> runWithLock(block: suspend SessionManagerScope.() -> T): T =
- scope.block()
+ mutex.withLock { scope.block() }
class TestSessionManagerScope : SessionManagerScope {
private val sessions = mutableMapOf<String, Session>()