Merge "Use SessionManager lock to close session in SessionWorker" into androidx-main
diff --git a/glance/glance/src/main/java/androidx/glance/session/SessionWorker.kt b/glance/glance/src/main/java/androidx/glance/session/SessionWorker.kt
index 544dd2d..8928858 100644
--- a/glance/glance/src/main/java/androidx/glance/session/SessionWorker.kt
+++ b/glance/glance/src/main/java/androidx/glance/session/SessionWorker.kt
@@ -34,11 +34,13 @@
 import kotlinx.coroutines.CoroutineExceptionHandler
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.Job
+import kotlinx.coroutines.NonCancellable
 import kotlinx.coroutines.cancel
 import kotlinx.coroutines.flow.MutableStateFlow
 import kotlinx.coroutines.flow.collectLatest
 import kotlinx.coroutines.flow.first
 import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
 
 /**
  * Options to configure [SessionWorker] timeouts.
@@ -126,7 +128,23 @@
                         }
                     )
                 } finally {
-                    session.close()
+                    // Get session manager lock to close session to prevent a race where an observer
+                    // who is checking session state (e.g. GlanceAppWidget.update) sees this session
+                    // as running before trying to send an event. Without the lock, it may happen
+                    // that we close right after they see us as running, which will cause an error
+                    // when they try to send an event.
+                    // With the lock, if they see us as running and send an event right before this
+                    // block, then the event will be unhandled.
+                    // After this block, observers will see this session as closed, so they will
+                    // start a new one instead of trying to send events to this one.
+                    // We must use NonCancellable here because it provides a Job that has not been
+                    // cancelled. The withTimerOrNull Job that the session runs in has already been
+                    // cancelled, so suspending with that Job would throw a CancellationException.
+                    withContext(NonCancellable) {
+                        sessionManager.runWithLock {
+                            closeSession(session.key)
+                        }
+                    }
                 }
                 Result.success()
             }
@@ -153,8 +171,7 @@
     val effectExceptionHandler = CoroutineExceptionHandler { _, throwable ->
         launch {
             session.onCompositionError(context, throwable)
-            session.close()
-            uiReady.emit(true)
+            [email protected]("Error in composition effect coroutine", throwable);
         }
     }
     val effectCoroutineContext = effectJobFactory().let { job ->
@@ -164,68 +181,69 @@
     val recomposer = Recomposer(effectCoroutineContext)
     val composition = Composition(Applier(root), recomposer)
 
-    launch(frameClock) {
-        try {
-            composition.setContent(session.provideGlance(context))
-            recomposer.runRecomposeAndApplyChanges()
-        } catch (e: CancellationException) {
-            // do nothing if we are cancelled.
-        } catch (throwable: Throwable) {
-            session.onCompositionError(context, throwable)
-            session.close()
-            // Set uiReady to true to resume coroutine waiting on it.
-            uiReady.emit(true)
-        }
-    }
-    launch {
-        var lastRecomposeCount = recomposer.changeCount
-        recomposer.currentState.collectLatest { state ->
-            if (SessionWorker.DEBUG)
-                Log.d(SessionWorker.TAG, "Recomposer(${session.key}): currentState=$state")
-            when (state) {
-                Recomposer.State.Idle -> {
-                    // Only update the session when a change has actually occurred. The
-                    // Recomposer may sometimes wake up due to changes in other
-                    // compositions. Also update the session if we have not sent an initial
-                    // tree yet.
-                    if (recomposer.changeCount > lastRecomposeCount || !uiReady.value) {
-                        if (SessionWorker.DEBUG)
-                            Log.d(SessionWorker.TAG, "UI tree updated (${session.key})")
-                        val processed = session.processEmittableTree(
-                            context,
-                            root.copy() as EmittableWithChildren
-                        )
-                        // If the UI has been processed for the first time, set uiReady to true
-                        // and start the timeout.
-                        if (!uiReady.value && processed) {
-                            uiReady.emit(true)
-                            startTimer(timeouts.initialTimeout)
-                        }
-                    }
-                    lastRecomposeCount = recomposer.changeCount
-                }
-                Recomposer.State.ShutDown -> cancel()
-                else -> {}
+    try {
+        launch(frameClock) {
+            try {
+                composition.setContent(session.provideGlance(context))
+                recomposer.runRecomposeAndApplyChanges()
+            } catch (e: CancellationException) {
+                // do nothing if we are cancelled.
+            } catch (throwable: Throwable) {
+                session.onCompositionError(context, throwable)
+                [email protected]("Error in recomposition coroutine", throwable);
             }
         }
-    }
+        launch {
+            var lastRecomposeCount = recomposer.changeCount
+            recomposer.currentState.collectLatest { state ->
+                if (SessionWorker.DEBUG)
+                    Log.d(SessionWorker.TAG, "Recomposer(${session.key}): currentState=$state")
+                when (state) {
+                    Recomposer.State.Idle -> {
+                        // Only update the session when a change has actually occurred. The
+                        // Recomposer may sometimes wake up due to changes in other
+                        // compositions. Also update the session if we have not sent an initial
+                        // tree yet.
+                        if (recomposer.changeCount > lastRecomposeCount || !uiReady.value) {
+                            if (SessionWorker.DEBUG)
+                                Log.d(SessionWorker.TAG, "UI tree updated (${session.key})")
+                            val processed = session.processEmittableTree(
+                                context,
+                                root.copy() as EmittableWithChildren
+                            )
+                            // If the UI has been processed for the first time, set uiReady to true
+                            // and start the timeout.
+                            if (!uiReady.value && processed) {
+                                uiReady.emit(true)
+                                startTimer(timeouts.initialTimeout)
+                            }
+                        }
+                        lastRecomposeCount = recomposer.changeCount
+                    }
+                    Recomposer.State.ShutDown -> cancel()
+                    else -> {}
+                }
+            }
+        }
 
-    // Wait until the Emittable tree has been processed at least once before receiving events.
-    uiReady.first { it }
-    session.receiveEvents(context) {
-        // If time is running low, add time to make sure that we have time to respond to this
-        // event.
-        if (timeLeft < timeouts.additionalTime) addTime(timeouts.additionalTime)
-        if (SessionWorker.DEBUG)
-            Log.d(SessionWorker.TAG, "processing event for ${session.key}; $timeLeft left")
-        launch { frameClock.startInteractive() }
+        // Wait until the Emittable tree has been processed at least once before receiving events.
+        uiReady.first { it }
+        // receiveEvents will suspend until the session is closed (usually due to widget deletion)
+        // or it is cancelled (in case of composition errors or timeout).
+        session.receiveEvents(context) {
+            // If time is running low, add time to make sure that we have time to respond to this
+            // event.
+            if (timeLeft < timeouts.additionalTime) addTime(timeouts.additionalTime)
+            if (SessionWorker.DEBUG)
+                Log.d(SessionWorker.TAG, "processing event for ${session.key}; $timeLeft left")
+            launch { frameClock.startInteractive() }
+        }
+    } finally {
+        composition.dispose()
+        frameClock.stopInteractive()
+        snapshotMonitor.cancel()
+        recomposer.cancel()
     }
-
-    composition.dispose()
-    frameClock.stopInteractive()
-    snapshotMonitor.cancel()
-    recomposer.close()
-    recomposer.join()
 }
 
 /**
diff --git a/glance/glance/src/test/kotlin/androidx/glance/session/SessionWorkerTest.kt b/glance/glance/src/test/kotlin/androidx/glance/session/SessionWorkerTest.kt
index 693218f..6fdf2ee 100644
--- a/glance/glance/src/test/kotlin/androidx/glance/session/SessionWorkerTest.kt
+++ b/glance/glance/src/test/kotlin/androidx/glance/session/SessionWorkerTest.kt
@@ -42,7 +42,10 @@
 import kotlinx.coroutines.flow.MutableSharedFlow
 import kotlinx.coroutines.flow.first
 import kotlinx.coroutines.launch
+import kotlinx.coroutines.sync.Mutex
+import kotlinx.coroutines.sync.withLock
 import kotlinx.coroutines.test.runTest
+import kotlinx.coroutines.yield
 import org.junit.Before
 import org.junit.Test
 import org.junit.runner.RunWith
@@ -297,16 +300,54 @@
         sessionManager.scope.startSession(context).first()
         sessionManager.scope.closeSession()
     }
+
+    @Test
+    fun sessionWorkerClosesWithTheLock() = runTest {
+        launch {
+            val result = worker.doWork()
+            assertThat(result).isEqualTo(Result.success())
+        }
+
+        val runError = mutableStateOf(false)
+        val resultFlow = sessionManager.runWithLock {
+            this as TestSessionManager.TestSessionManagerScope
+            startSession(context) {
+                Text("Hello")
+                if (runError.value) throw Throwable()
+            }
+        }
+        resultFlow.first { it.isSuccess }
+
+        // Start the error within the lock
+        sessionManager.runWithLock {
+            runError.value = true
+            resultFlow.first { it.isFailure }
+            // Composition is now cancelled due to error. However, the worker should not be able to
+            // close the session channel until it has the lock. yield() here; the worker will run
+            // until it suspends to wait for the lock.
+            yield()
+            val session = checkNotNull(getSession(SESSION_KEY))
+            assertThat(session.isOpen).isTrue()
+        }
+
+        // Now that we've let go of the lock, yield() again to make sure the worker can resume
+        // from waiting for the lock and close the session.
+        yield()
+        sessionManager.runWithLock {
+            val session = checkNotNull(getSession(SESSION_KEY))
+            assertThat(session.isOpen).isFalse()
+        }
+    }
 }
 
 private const val SESSION_KEY = "123"
 
 class TestSessionManager : SessionManager {
     val scope = TestSessionManagerScope()
-    // No locking needed, tests are run on single threaded environment and is only user of this
-    // SessionManager.
+    private val mutex = Mutex()
+
     override suspend fun <T> runWithLock(block: suspend SessionManagerScope.() -> T): T =
-        scope.block()
+        mutex.withLock { scope.block() }
 
     class TestSessionManagerScope : SessionManagerScope {
         private val sessions = mutableMapOf<String, Session>()