package androidx.compose.runtime.snapshots
import androidx.compose.runtime.AtomicReference
import androidx.compose.runtime.Composable
import androidx.compose.runtime.DisallowComposableCalls
import androidx.compose.runtime.ExperimentalComposeApi
import androidx.compose.runtime.InternalComposeApi
import androidx.compose.runtime.SnapshotThreadLocal
import androidx.compose.runtime.collection.IdentityArraySet
import androidx.compose.runtime.synchronized
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import androidx.compose.runtime.internal.JvmDefaultWithCompatibility
* A snapshot of the values return by mutable states and other state objects. All state object
* will have the same value in the snapshot as they had when the snapshot was created unless they
* are explicitly changed in the snapshot.
* To enter a snapshot call [enter]. The snapshot is the current snapshot as returned by
* [currentSnapshot] until the control returns from the lambda (or until a nested [enter] is
* called). All state objects will return the values associated with this snapshot, locally in the
* thread, until [enter] returns. All other threads are unaffected.
* Snapshots can be nested by calling [takeNestedSnapshot].
* @see takeSnapshot
* @see takeMutableSnapshot
* @see androidx.compose.runtime.mutableStateOf
* @see androidx.compose.runtime.mutableStateListOf
* @see androidx.compose.runtime.mutableStateMapOf
sealed class Snapshot(
id: Int,
* A set of all the snapshots that should be treated as invalid.
internal open var invalid: SnapshotIdSet
) {
* The snapshot id of the snapshot. This is a unique number from a monotonically increasing
* value for each snapshot taken.
open var id: Int = id
internal set
* The root snapshot for this snapshot. For non-nested snapshots this is always `this`. For
* nested snapshot it is the parent's [root].
abstract val root: Snapshot
* True if any change to a state object in this snapshot will throw.
abstract val readOnly: Boolean
* Dispose the snapshot. Neglecting to dispose a snapshot will result in difficult to
* diagnose memory leaks as it indirectly causes all state objects to maintain its value for
* the un-disposed snapshot.
open fun dispose() {
disposed = true
sync {
* Take a snapshot of the state values in this snapshot. The resulting [Snapshot] is read-only.
* All nested snapshots need to be disposed by calling [dispose] before resources associated
* with this snapshot can be collected. Nested snapshots are still valid after the parent has
* been disposed.
abstract fun takeNestedSnapshot(readObserver: ((Any) -> Unit)? = null): Snapshot
* Whether there are any pending changes in this snapshot. These changes are not visible
* until the snapshot is applied.
abstract fun hasPendingChanges(): Boolean
* Enter the snapshot. In [block] all state objects have the value associated with this
* snapshot. The value of [currentSnapshot] will be this snapshot until this [block] returns
* or a nested call to [enter] is called. When [block] returns, the previous current snapshot
* is restored if there was one.
* All changes to state objects inside [block] are isolated to this snapshot and are not
* visible to other snapshot or as global state. If this is a [readOnly] snapshot, any
* changes to state objects will throw an [IllegalStateException].
* For a [MutableSnapshot], changes made to a snapshot inside [block] can be applied
* atomically to the global state (or to its parent snapshot if it is a nested snapshot) by
* calling [MutableSnapshot.apply].
* @see androidx.compose.runtime.mutableStateOf
* @see androidx.compose.runtime.mutableStateListOf
* @see androidx.compose.runtime.mutableStateMapOf
inline fun <T> enter(block: () -> T): T {
val previous = makeCurrent()
try {
return block()
} finally {
internal open fun makeCurrent(): Snapshot? {
val previous = threadSnapshot.get()
return previous
internal open fun restoreCurrent(snapshot: Snapshot?) {
* Enter the snapshot, returning the previous [Snapshot] for leaving this snapshot later
* using [unsafeLeave]. Prefer [enter] or [asContextElement] instead of using [unsafeEnter]
* directly to prevent mismatched [unsafeEnter]/[unsafeLeave] calls.
* After returning all state objects have the value associated with this snapshot.
* The value of [currentSnapshot] will be this snapshot until [unsafeLeave] is called
* with the returned [Snapshot] or another call to [unsafeEnter] or [enter]
* is made.
* All changes to state objects until another snapshot is entered or this snapshot is left
* are isolated to this snapshot and are not visible to other snapshot or as global state.
* If this is a [readOnly] snapshot, any changes to state objects will throw an
* [IllegalStateException].
* For a [MutableSnapshot], changes made to a snapshot can be applied
* atomically to the global state (or to its parent snapshot if it is a nested snapshot) by
* calling [MutableSnapshot.apply].
fun unsafeEnter(): Snapshot? = makeCurrent()
* Leave the snapshot, restoring the [oldSnapshot] before returning.
* See [unsafeEnter].
fun unsafeLeave(oldSnapshot: Snapshot?) {
check(threadSnapshot.get() === this) {
"Cannot leave snapshot; $this is not the current snapshot"
internal var disposed = false
* Handle to use when unpinning this snapshot. -1 if this snapshot has been unpinned.
private var pinningTrackingHandle =
if (id != INVALID_SNAPSHOT) trackPinning(id, invalid) else -1
internal inline val isPinned get() = pinningTrackingHandle >= 0
* The read observer for the snapshot if there is one.
internal abstract val readObserver: ((Any) -> Unit)?
* The write observer for the snapshot if there is one.
internal abstract val writeObserver: ((Any) -> Unit)?
* Called when a nested snapshot of this snapshot is activated
internal abstract fun nestedActivated(snapshot: Snapshot)
* Called when a nested snapshot of this snapshot is deactivated
internal abstract fun nestedDeactivated(snapshot: Snapshot)
* Record that state was modified in the snapshot.
internal abstract fun recordModified(state: StateObject)
* The set of state objects that have been modified in this snapshot.
internal abstract val modified: IdentityArraySet<StateObject>?
* Notify the snapshot that all objects created in this snapshot to this point should be
* considered initialized. If any state object is modified after this point it will
* appear as modified in the snapshot. Any applicable snapshot write observer will be
* called for the object and the object will be part of the a set of mutated objects sent to
* any applicable snapshot apply observer.
* Unless [notifyObjectsInitialized] is called, state objects created in a snapshot are not
* considered modified by the snapshot even if they are modified after construction.
internal abstract fun notifyObjectsInitialized()
* Closes the snapshot by removing the snapshot id (an any previous id's) from the list of
* open snapshots and unpinning snapshots that no longer are referenced by this snapshot.
internal fun closeAndReleasePinning() {
sync {
* Closes the snapshot by removing the snapshot id (and any previous ids) from the list of
* open snapshots. Does not release pinned snapshots. See [releasePinnedSnapshotsForCloseLocked]
* for the second half of [closeAndReleasePinning].
* Call while holding a `sync {}` lock.
internal open fun closeLocked() {
openSnapshots = openSnapshots.clear(id)
* Releases all pinned snapshots required to perform a clean [closeAndReleasePinning].
* Call while holding a `sync {}` lock.
* See [closeAndReleasePinning], [closeLocked].
internal open fun releasePinnedSnapshotsForCloseLocked() {
internal fun validateNotDisposed() {
require(!disposed) { "Cannot use a disposed snapshot" }
internal fun releasePinnedSnapshotLocked() {
if (pinningTrackingHandle >= 0) {
pinningTrackingHandle = -1
internal fun takeoverPinnedSnapshot(): Int =
pinningTrackingHandle.also { pinningTrackingHandle = -1 }
companion object {
* Return the thread's active snapshot. If no thread snapshot is active then the current
* global snapshot is used.
val current get() = currentSnapshot()
* Take a snapshot of the current value of all state objects. The values are preserved until
* [Snapshot.dispose] is called on the result.
* The [readObserver] parameter can be used to track when all state objects are read when in
* [Snapshot.enter]. A snapshot apply observer can be registered using
* [Snapshot.registerApplyObserver] to observe modification of state objects.
* An active snapshot (after it is created but before [Snapshot.dispose] is called) requires
* resources to track the values in the snapshot. Once a snapshot is no longer needed it
* should disposed by calling [Snapshot.dispose].
* Leaving a snapshot active could cause hard to diagnose memory leaks values as are
* maintained by state objects for these unneeded snapshots. Take care to always call
* [Snapshot.dispose] on all snapshots when they are no longer needed.
* Composition uses both of these to implicitly subscribe to changes to state object and
* automatically update the composition when state objects read during composition change.
* A nested snapshot can be taken of a snapshot which is an independent read-only copy of
* the snapshot and can be disposed independently. This is used by [takeSnapshot] when in
* a read-only snapshot for API consistency allowing the result of [takeSnapshot] to be
* disposed leaving the parent snapshot active.
* @param readObserver called when any state object is read in the lambda passed to
* [Snapshot.enter] or in the [Snapshot.enter] of any nested snapshot.
* @see Snapshot
* @see Snapshot.registerApplyObserver
fun takeSnapshot(
readObserver: ((Any) -> Unit)? = null
): Snapshot = currentSnapshot().takeNestedSnapshot(readObserver)
* Take a snapshot of the current value of all state objects that also allows the state
* to be changed and later atomically applied when [MutableSnapshot.apply] is called. The
* values are preserved until [Snapshot.dispose] is called on the result. The global
* state will either see all the changes made as one atomic change, when [MutableSnapshot
* .apply] is called, or none of the changes if the mutable state object is disposed
* before being applied.
* The values in a snapshot can be modified by calling [Snapshot.enter] and then, in its
* lambda, modify any state object. The new values of the state objects will only become
* visible to the global state when [MutableSnapshot.apply] is called.
* An active snapshot (after it is created but before [Snapshot.dispose] is called) requires
* resources to track the values in the snapshot. Once a snapshot is no longer needed it
* should disposed by calling [Snapshot.dispose].
* Leaving a snapshot active could cause hard to diagnose memory leaks as values are
* maintained by state objects for these unneeded snapshots. Take care to always call
* [Snapshot.dispose] on all snapshots when they are no longer needed.
* A nested snapshot can be taken by calling [Snapshot.takeNestedSnapshot], for a read-only
* snapshot, or [MutableSnapshot.takeNestedMutableSnapshot] for a snapshot that can be
* changed. Nested mutable snapshots are applied to the this, the parent snapshot, when
* their [MutableSnapshot.apply] is called. Their applied changes will be visible to in
* this snapshot but will not be visible other snapshots (including other nested
* snapshots) or the global state until this snapshot is applied by calling
* [MutableSnapshot.apply].
* Once [MutableSnapshot.apply] is called on this, the parent snapshot, all calls to
* [MutableSnapshot.apply] on an active nested snapshot will fail.
* Changes to a mutable snapshot are isolated, using snapshot isolation, from all other
* snapshots. Their changes are only visible as global state or to new snapshots once
* [MutableSnapshot.apply] is called.
* Applying a snapshot can fail if currently visible changes to the state object
* conflicts with a change made in the snapshot.
* When in a mutable snapshot, [takeMutableSnapshot] creates a nested snapshot of the
* current mutable snapshot. If the current snapshot is read-only, an exception is thrown.
* The current snapshot is the result of calling [currentSnapshot] which is updated by
* calling [Snapshot.enter] which makes the [Snapshot] the current snapshot while in its
* lambda.
* Composition uses mutable snapshots to allow changes made in a [Composable] functions
* to be temporarily isolated from the global state and is later applied to the global
* state when the composition is applied. If [MutableSnapshot.apply] fails applying this
* snapshot, the snapshot and the changes calculated during composition are disposed and
* a new composition is scheduled to be calculated again.
* @param readObserver called when any state object is read in the lambda passed to
* [Snapshot.enter] or in the [Snapshot.enter] of any nested snapshots.
* Composition, layout and draw use [readObserver] to implicitly subscribe to changes to
* state objects to know when to update.
* @param writeObserver called when a state object is created or just before it is
* written to the first time in the snapshot or a nested mutable snapshot. This might be
* called several times for the same object if nested mutable snapshots are created.
* Composition uses [writeObserver] to track when a state object is modified during
* composition in order to invalidate the reads that have not yet occurred. This allows a
* single pass of composition for state objects that are written to before they are read
* (such as modifying the value of a dynamic ambient provider).
* @see Snapshot.takeSnapshot
* @see Snapshot
* @see MutableSnapshot
fun takeMutableSnapshot(
readObserver: ((Any) -> Unit)? = null,
writeObserver: ((Any) -> Unit)? = null
): MutableSnapshot =
(currentSnapshot() as? MutableSnapshot)?.takeNestedMutableSnapshot(
) ?: error("Cannot create a mutable snapshot of an read-only snapshot")
* Escape the current snapshot, if there is one. All state objects will have the value
* associated with the global while the [block] lambda is executing.
* @return the result of [block]
inline fun <T> global(block: () -> T): T {
val previous = removeCurrent()
return block().also { restoreCurrent(previous) }
* Take a [MutableSnapshot] and run [block] within it. When [block] returns successfully,
* attempt to [MutableSnapshot.apply] the snapshot. Returns the result of [block] or throws
* [SnapshotApplyConflictException] if snapshot changes attempted by [block] could not be
* applied.
* Prior to returning, any changes made to snapshot state (e.g. state holders returned by
* [androidx.compose.runtime.mutableStateOf] are not visible to other threads. When
* [withMutableSnapshot] returns successfully those changes will be made visible to other
* threads and any snapshot observers (e.g. [androidx.compose.runtime.snapshotFlow]) will
* be notified of changes.
* [block] must not suspend if [withMutableSnapshot] is called from a suspend function.
// TODO: determine a good way to prevent/discourage suspending in an inlined [block]
inline fun <R> withMutableSnapshot(
block: () -> R
): R = takeMutableSnapshot().run {
try {
enter(block).also { apply().check() }
} finally {
* Observe reads and or write of state objects in the current thread.
* This only affects the current snapshot (if any) and any new snapshots create from
* [Snapshot.takeSnapshot] and [takeMutableSnapshot]. It will not affect any snapshots
* previous created even if [Snapshot.enter] is called in [block].
* @param readObserver called when any state object is read.
* @param writeObserver called when a state object is created or just before it is
* written to the first time in the snapshot or a nested mutable snapshot. This might be
* called several times for the same object if nested mutable snapshots are created.
fun <T> observe(
readObserver: ((Any) -> Unit)? = null,
writeObserver: ((Any) -> Unit)? = null,
block: () -> T
): T {
if (readObserver != null || writeObserver != null) {
val currentSnapshot = threadSnapshot.get()
val snapshot =
if (currentSnapshot == null || currentSnapshot is MutableSnapshot)
parentSnapshot = currentSnapshot as? MutableSnapshot,
specifiedReadObserver = readObserver,
specifiedWriteObserver = writeObserver,
mergeParentObservers = true,
ownsParentSnapshot = false
else if (readObserver == null) return block()
else currentSnapshot.takeNestedSnapshot(readObserver)
try {
return snapshot.enter(block)
} finally {
} else return block()
internal fun createNonObservableSnapshot(): Snapshot =
previousSnapshot = threadSnapshot.get()
* Passed [block] will be run with all the currently set snapshot read observers disabled.
@Suppress("BanInlineOptIn") // Treat Kotlin Contracts as non-experimental.
inline fun <T> withoutReadObservation(block: @DisallowComposableCalls () -> T): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
val snapshot = createNonObservableSnapshot()
try {
return snapshot.enter(block)
} finally {
* Register an apply listener that is called back when snapshots are applied to the
* global state.
* @return [ObserverHandle] to unregister [observer].
fun registerApplyObserver(observer: (Set<Any>, Snapshot) -> Unit): ObserverHandle {
// Ensure observer does not see changes before this call.
sync {
return ObserverHandle {
sync {
* Register an observer of the first write to the global state of a global state object
* since the last call to [sendApplyNotifications].
* Composition uses this to schedule a new composition whenever a state object that
* was read in composition is modified.
* State objects can be sent to the apply observer that have not been sent to global write
* observers. This happens for state objects inside [MutableSnapshot] that is later
* applied by calling [MutableSnapshot.apply].
* This should only be used to determine if a call to [sendApplyNotifications] should be
* scheduled to be called.
* @return [ObserverHandle] to unregister [observer].
fun registerGlobalWriteObserver(observer: ((Any) -> Unit)): ObserverHandle {
sync {
return ObserverHandle {
sync {
* Notify the snapshot that all objects created in this snapshot to this point should be
* considered initialized. If any state object is are modified passed this point it will
* appear as modified in the snapshot and any applicable snapshot write observer will be
* called for the object and the object will be part of the a set of mutated objects sent to
* any applicable snapshot apply observer.
* Unless [notifyObjectsInitialized] is called, state objects created in a snapshot are not
* considered modified by the snapshot even if they are modified after construction.
* Compose uses this between phases of composition to allow observing changes to state
* objects create in a previous phase.
fun notifyObjectsInitialized() = currentSnapshot().notifyObjectsInitialized()
* Send any pending apply notifications for state objects changed outside a snapshot.
* Apply notifications for state objects modified outside snapshot are deferred until method
* is called. This method is implicitly called whenever a non-nested [MutableSnapshot]
* is applied making its changes visible to all new, non-nested snapshots.
* Composition schedules this to be called after changes to state objects are
* detected an observer registered with [registerGlobalWriteObserver].
fun sendApplyNotifications() {
val changes = sync {
currentGlobalSnapshot.get().modified?.isNotEmpty() == true
if (changes)
fun openSnapshotCount() = openSnapshots.toList().size
internal fun removeCurrent(): Snapshot? {
val previous = threadSnapshot.get()
if (previous != null) threadSnapshot.set(null)
return previous
internal fun restoreCurrent(previous: Snapshot?) {
if (previous != null) threadSnapshot.set(previous)
* Pin the snapshot and invalid set.
* @return returns a handle that should be passed to [releasePinningLocked] when the snapshot closes or
* is disposed.
internal fun trackPinning(id: Int, invalid: SnapshotIdSet): Int {
val pinned = invalid.lowest(id)
return sync {
* Release the [handle] returned by [trackPinning]
internal fun releasePinningLocked(handle: Int) {
* A snapshot of the values return by mutable states and other state objects. All state object
* will have the same value in the snapshot as they had when the snapshot was created unless they
* are explicitly changed in the snapshot.
* To enter a snapshot call [enter]. The snapshot is the current snapshot as returned by
* [currentSnapshot] until the control returns from the lambda (or until a nested [enter] is
* called. All state objects will return the values associated with this snapshot, locally in the
* thread, until [enter] returns. All other threads are unaffected.
* All changes made in a [MutableSnapshot] are snapshot isolated from all other snapshots and
* their changes can only be seen globally, or by new shots, after [MutableSnapshot.apply] as been
* called.
* Snapshots can be nested by calling [takeNestedSnapshot] or
* [MutableSnapshot.takeNestedMutableSnapshot].
* @see Snapshot.takeMutableSnapshot
* @see androidx.compose.runtime.mutableStateOf
* @see androidx.compose.runtime.mutableStateListOf
* @see androidx.compose.runtime.mutableStateMapOf
open class MutableSnapshot internal constructor(
id: Int,
invalid: SnapshotIdSet,
override val readObserver: ((Any) -> Unit)?,
override val writeObserver: ((Any) -> Unit)?
) : Snapshot(id, invalid) {
* Whether there are any pending changes in this snapshot. These changes are not visible
* until the snapshot is applied.
override fun hasPendingChanges(): Boolean = modified?.isNotEmpty() == true
* Take a mutable snapshot of the state values in this snapshot. Entering this snapshot by
* calling [enter] allows state objects to be modified that are not visible to the this, the
* parent snapshot, until the [apply] is called.
* Applying a nested snapshot, by calling [apply], applies its change to, this, the parent
* snapshot. For a change to be visible globally, all the parent snapshots need to be applied
* until the root snapshot is applied to the global state.
* All nested snapshots need to be disposed by calling [dispose] before resources associated
* with this snapshot can be collected. Nested active snapshots are still valid after the parent
* has been disposed but calling [apply] will fail.
open fun takeNestedMutableSnapshot(
readObserver: ((Any) -> Unit)? = null,
writeObserver: ((Any) -> Unit)? = null
): MutableSnapshot {
return advance {
sync {
val newId = nextSnapshotId++
openSnapshots = openSnapshots.set(newId)
val currentInvalid = invalid
this.invalid = currentInvalid.set(newId)
currentInvalid.addRange(id + 1, newId),
mergedReadObserver(readObserver, this.readObserver),
mergedWriteObserver(writeObserver, this.writeObserver),
* Apply the changes made to state objects in this snapshot to the global state, or to the
* parent snapshot if this is a nested mutable snapshot.
* Once this method returns all changes made to this snapshot are atomically visible as the
* global state of the state object or to the parent snapshot.
* While a snapshot is active (after it is created but before [apply] or [dispose] is called)
* requires resources to track the values in the snapshot. Once a snapshot is no longer
* needed it should be either applied by calling [apply] or disposed by calling [dispose]. A
* snapshot that has been had is [apply] called can also have [dispose] called on it. However,
* calling [apply] after calling [dispose] will throw an exception.
* Leaving a snapshot active could cause hard to diagnose memory leaks values are maintained
* by state objects for unneeded snapshots. Take care to always call [dispose] on any snapshot.
open fun apply(): SnapshotApplyResult {
// NOTE: the this algorithm is currently does not guarantee serializable snapshots as it
// doesn't prevent crossing writes as described here
// Just removing the snapshot from the active snapshot set is enough to make it part of the
// next snapshot, however, this should only be done after first determining that there are no
// colliding writes are being applied.
// A write is considered colliding if any write occurred in a state object in a snapshot
// applied since the snapshot was taken.
val modified = modified
val optimisticMerges = if (modified != null) optimisticMerges(
) else null
val (observers, globalModified) = sync {
if (modified == null || modified.size == 0) {
val previousGlobalSnapshot = currentGlobalSnapshot.get()
takeNewGlobalSnapshot(previousGlobalSnapshot, emptyLambda)
val globalModified = previousGlobalSnapshot.modified
if (!globalModified.isNullOrEmpty())
applyObservers.toMutableList() to globalModified
emptyList<(Set<Any>, Snapshot) -> Unit>() to null
} else {
val previousGlobalSnapshot = currentGlobalSnapshot.get()
val result = innerApplyLocked(
if (result != SnapshotApplyResult.Success) return result
// Take a new global snapshot that includes this one.
takeNewGlobalSnapshot(previousGlobalSnapshot, emptyLambda)
val globalModified = previousGlobalSnapshot.modified
this.modified = null
previousGlobalSnapshot.modified = null
applyObservers.toMutableList() to globalModified
// Mark as applied
applied = true
// Notify any apply observers that changes applied were seen
if (!globalModified.isNullOrEmpty()) {
observers.fastForEach {
it(globalModified, this)
if (!modified.isNullOrEmpty()) {
observers.fastForEach {
it(modified, this)
// Wait to release pinned snapshots until after running observers.
// This permits observers to safely take a nested snapshot of the one that was just applied
// before unpinning records that need to be retained in this case.
sync {
globalModified?.fastForEach { processForUnusedRecordsLocked(it) }
modified?.fastForEach { processForUnusedRecordsLocked(it) }
merged?.fastForEach { processForUnusedRecordsLocked(it) }
merged = null
return SnapshotApplyResult.Success
override val readOnly: Boolean get() = false
override val root: Snapshot get() = this
override fun dispose() {
if (!disposed) {
override fun takeNestedSnapshot(readObserver: ((Any) -> Unit)?): Snapshot {
val previousId = id
return advance {
sync {
val readonlyId = nextSnapshotId++
openSnapshots = openSnapshots.set(readonlyId)
invalid.addRange(previousId + 1, readonlyId),
override fun nestedActivated(snapshot: Snapshot) { snapshots++ }
override fun nestedDeactivated(snapshot: Snapshot) {
require(snapshots > 0)
if (--snapshots == 0) {
if (!applied) {
override fun notifyObjectsInitialized() {
if (applied || disposed) return
override fun closeLocked() {
// Remove itself and previous ids from the open set.
openSnapshots = openSnapshots.clear(id).andNot(previousIds)
override fun releasePinnedSnapshotsForCloseLocked() {
private fun validateNotApplied() {
check(!applied) {
"Unsupported operation on a snapshot that has been applied"
private fun validateNotAppliedOrPinned() {
check(!applied || isPinned) {
"Unsupported operation on a disposed or applied snapshot"
* Abandon the snapshot. This does NOT [closeAndReleasePinning], which must be done
* as an additional step by callers.
private fun abandon() {
val modified = modified
if (modified != null) {
// Mark all state records created in this snapshot as invalid. This allows the snapshot
// id to be forgotten as no state records will refer to it.
this.modified = null
val id = id
for (state in modified) {
var current: StateRecord? = state.firstStateRecord
while (current != null) {
if (current.snapshotId == id || current.snapshotId in previousIds) {
current.snapshotId = INVALID_SNAPSHOT
current =
// The snapshot can now be closed.
internal fun innerApplyLocked(
snapshotId: Int,
optimisticMerges: Map<StateRecord, StateRecord>?,
invalidSnapshots: SnapshotIdSet
): SnapshotApplyResult {
// This must be called in a synchronized block
// If there are modifications we need to ensure none of the modifications have
// collisions.
// A record is guaranteed not collide if no other write was performed to the record
// by an applied snapshot since this snapshot was taken. No writes to a state object
// occurred if, ignoring this snapshot, the readable records for the snapshots are
// the same. If they are different then there is a potential collision and the state
// object is asked if it can resolve the collision. If it can the updated state record
// is for the apply.
var mergedRecords: MutableList<Pair<StateObject, StateRecord>>? = null
val start = this.invalid.set(id).or(this.previousIds)
val modified = modified!!
var statesToRemove: MutableList<StateObject>? = null
for (state in modified) {
val first = state.firstStateRecord
// If either current or previous cannot be calculated the object was created
// in a nested snapshot that was committed then changed.
val current = readable(first, snapshotId, invalidSnapshots) ?: continue
val previous = readable(first, id, start) ?: continue
if (current != previous) {
val applied = readable(first, id, this.invalid) ?: readError()
val merged = optimisticMerges?.get(current) ?: run {
state.mergeRecords(previous, current, applied)
when (merged) {
null -> return SnapshotApplyResult.Failure(this)
applied -> {
// Nothing to do the merge policy says that the current changes
// obscure the current value so ignore the conflict
current -> {
mergedRecords ?: mutableListOf<Pair<StateObject, StateRecord>>().also {
mergedRecords = it
).add(state to current.create())
// If we revert to current then the state is no longer modified.
statesToRemove ?: mutableListOf<StateObject>().also {
statesToRemove = it
else -> {
mergedRecords ?: mutableListOf<Pair<StateObject, StateRecord>>().also {
mergedRecords = it
if (merged != previous) state to merged
else state to previous.create()
mergedRecords?.let {
// Ensure we have a new snapshot id
// Update all the merged records to have the new id.
it.fastForEach { merged ->
val (state, stateRecord) = merged
stateRecord.snapshotId = id
sync { = state.firstStateRecord
statesToRemove?.let { list ->
list.fastForEach { modified.remove(it) }
val mergedList = merged
merged = if (mergedList == null) list else mergedList + list
return SnapshotApplyResult.Success
internal inline fun <T> advance(block: () -> T): T {
return block().also {
// Only advance this snapshot if it's possible for it to be applied later,
// otherwise we don't need to bother.
// This simplifies tracking of open snapshots when an apply observer takes
// a nested snapshot of the snapshot that was just applied.
if (!applied && !disposed) {
val previousId = id
sync {
id = nextSnapshotId++
openSnapshots = openSnapshots.set(id)
invalid = invalid.addRange(previousId + 1, id)
internal fun advance(): Unit = advance { }
internal fun recordPrevious(id: Int) {
sync {
previousIds = previousIds.set(id)
internal fun recordPreviousPinnedSnapshot(id: Int) {
if (id >= 0)
previousPinnedSnapshots += id
internal fun recordPreviousPinnedSnapshots(handles: IntArray) {
// Avoid unnecessary copies implied by the `+` below.
if (handles.isEmpty()) return
val pinned = previousPinnedSnapshots
previousPinnedSnapshots = if (pinned.isEmpty()) handles else pinned + handles
private fun releasePreviouslyPinnedSnapshotsLocked() {
for (index in previousPinnedSnapshots.indices) {
internal fun recordPreviousList(snapshots: SnapshotIdSet) {
sync {
previousIds = previousIds.or(snapshots)
override fun recordModified(state: StateObject) {
(modified ?: IdentityArraySet<StateObject>().also { modified = it }).add(state)
override var modified: IdentityArraySet<StateObject>? = null
internal var merged: List<StateObject>? = null
* A set of the id's previously associated with this snapshot. When this snapshot closes
* then these ids must be removed from the global as well.
internal var previousIds: SnapshotIdSet = SnapshotIdSet.EMPTY
* A list of the pinned snapshots handles that must be released by this snapshot
internal var previousPinnedSnapshots: IntArray = EmptyIntArray
* The number of pending nested snapshots of this snapshot. To simplify the code, this
* snapshot it, itself, counted as its own nested snapshot.
private var snapshots = 1
* Tracks whether the snapshot has been applied.
internal var applied = false
private companion object {
private val EmptyIntArray = IntArray(0)
* The result of a applying a mutable snapshot. [Success] indicates that the snapshot was
* successfully applied and is now visible as the global state of the state object (or visible
* in the parent snapshot for a nested snapshot). [Failure] indicates one or more state objects
* were modified by both this snapshot and in the global (or parent) snapshot, and the changes from
* this snapshot are **not** visible in the global or parent snapshot.
sealed class SnapshotApplyResult {
* Check the result of an apply. If the result is [Success] then this does does nothing. If
* the result is [Failure] then a [SnapshotApplyConflictException] exception is thrown. Once
* [check] as been called the snapshot is disposed.
abstract fun check()
* True if the result is [Success].
abstract val succeeded: Boolean
object Success : SnapshotApplyResult() {
* Check the result of a snapshot apply. Calling [check] on a [Success] result is a noop.
override fun check() { }
override val succeeded: Boolean get() = true
class Failure(val snapshot: Snapshot) : SnapshotApplyResult() {
* Check the result of a snapshot apply. Calling [check] on a [Failure] result throws a
* [SnapshotApplyConflictException] exception.
override fun check() {
throw SnapshotApplyConflictException(snapshot)
override val succeeded: Boolean get() = false
* The type returned by observer registration methods that unregisters the observer when it is
* disposed.
fun interface ObserverHandle {
* Dispose the observer causing it to be unregistered from the snapshot system.
fun dispose()
* Return the thread's active snapshot. If no thread snapshot is active then the current global
* snapshot is used.
internal fun currentSnapshot(): Snapshot =
threadSnapshot.get() ?: currentGlobalSnapshot.get()
* An exception that is thrown when [SnapshotApplyResult.check] is called on a result of a
* [MutableSnapshot.apply] that fails to apply.
class SnapshotApplyConflictException(
@Suppress("unused") val snapshot: Snapshot
) : Exception()
* Snapshot local value of a state object.
abstract class StateRecord {
* The snapshot id of the snapshot in which the record was created.
internal var snapshotId: Int = currentSnapshot().id
* Reference of the next state record. State records are stored in a linked list.
* Changes to [next] must preserve all existing records to all threads even during
* intermediately changes. For example, it is safe to add the beginning or end of the list
* but adding to the middle requires care. First the new record must have its [next] updated
* then the [next] of its new predecessor can then be set to point to it. This implies that
* records that are already in the list cannot be moved in the list as this the change must
* be atomic to all threads that cannot happen without a lock which this list cannot afford.
* It is unsafe to remove a record as it might be in the process of being reused (see [usedLocked]).
* If a record is removed care must be taken to ensure that it is not being claimed by some
* other thread. This would require changes to [usedLocked].
internal var next: StateRecord? = null
* Copy the value into this state record from another for the same state object.
abstract fun assign(value: StateRecord)
* Create a new state record for the same state object.
abstract fun create(): StateRecord
* Interface implemented by all snapshot aware state objects. Used by this module to maintain the
* state records of a state object.
interface StateObject {
* The first state record in a linked list of state records.
val firstStateRecord: StateRecord
* Add a new state record to the beginning of a list. After this call [firstStateRecord] should
* be [value].
fun prependStateRecord(value: StateRecord)
* Produce a merged state based on the conflicting state changes.
* This method must not modify any of the records received and should treat the state records
* as immutable, even the [applied] record.
* @param previous the state record that was used to create the [applied] record and is a state
* that also (though indirectly) produced the [current] record.
* @param current the state record of the parent snapshot or global state.
* @param applied the state record that is being applied of the parent snapshot or global
* state.
* @return the modified state or `null` if the values cannot be merged. If the states cannot
* be merged the current apply will fail. Any of the parameters can be returned as a result.
* If it is not one of the parameter values then it *must* be a new value that is created by
* calling [StateRecord.create] on one of the records passed and then can be modified
* to have the merged value before being returned. If a new record is returned
* [MutableSnapshot.apply] will update the internal snapshot id and call
* [prependStateRecord] if the record is used.
fun mergeRecords(
previous: StateRecord,
current: StateRecord,
applied: StateRecord
): StateRecord? = null
* A snapshot whose state objects cannot be modified. If a state object is modified when in a
* read-only snapshot a [IllegalStateException] is thrown.
internal class ReadonlySnapshot internal constructor(
id: Int,
invalid: SnapshotIdSet,
override val readObserver: ((Any) -> Unit)?
) : Snapshot(id, invalid) {
* The number of nested snapshots that are active. To simplify the code, this snapshot counts
* itself as a nested snapshot.
private var snapshots = 1
override val readOnly: Boolean get() = true
override val root: Snapshot get() = this
override fun hasPendingChanges(): Boolean = false
override val writeObserver: ((Any) -> Unit)? get() = null
override var modified: IdentityArraySet<StateObject>?
get() = null
set(value) = unsupported()
override fun takeNestedSnapshot(readObserver: ((Any) -> Unit)?): Snapshot {
return NestedReadonlySnapshot(id, invalid, readObserver, this)
override fun notifyObjectsInitialized() {
// Nothing to do for read-only snapshots
override fun dispose() {
if (!disposed) {
override fun nestedActivated(snapshot: Snapshot) { snapshots++ }
override fun nestedDeactivated(snapshot: Snapshot) {
if (--snapshots == 0) {
// A read-only snapshot can be just be closed as it has no modifications.
override fun recordModified(state: StateObject) {
internal class NestedReadonlySnapshot(
id: Int,
invalid: SnapshotIdSet,
readObserver: ((Any) -> Unit)?,
val parent: Snapshot
) : Snapshot(id, invalid) {
init { parent.nestedActivated(this) }
override val readOnly get() = true
override val root: Snapshot get() = parent.root
override fun takeNestedSnapshot(readObserver: ((Any) -> Unit)?) =
NestedReadonlySnapshot(id, invalid, readObserver, parent)
override fun notifyObjectsInitialized() {
// Nothing to do for read-only snapshots
override fun hasPendingChanges(): Boolean = false
override val readObserver: ((Any) -> Unit)? =
// Merge the read observers if necessary
readObserver?.let {
parent.readObserver?.let {
{ state: Any ->
} ?: readObserver
} ?: parent.readObserver
override fun dispose() {
if (!disposed) {
if (id != {
override val modified: IdentityArraySet<StateObject>? get() = null
override val writeObserver: ((Any) -> Unit)? get() = null
override fun recordModified(state: StateObject) = reportReadonlySnapshotWrite()
override fun nestedDeactivated(snapshot: Snapshot) = unsupported()
override fun nestedActivated(snapshot: Snapshot) = unsupported()
private val emptyLambda: (invalid: SnapshotIdSet) -> Unit = { }
* A snapshot object that simplifies the code by treating the global state as a mutable snapshot.
internal class GlobalSnapshot(id: Int, invalid: SnapshotIdSet) :
id, invalid, null,
sync {
// Take a defensive copy of the globalWriteObservers list. This then avoids having to
// synchronized access to writerObserver in places it is called and allows the list to
// change while notifications are being dispatched. Changes to globalWriteObservers force
// a new global snapshot to be created.
if (globalWriteObservers.isNotEmpty()) {
} else null
)?.let {
it.singleOrNull() ?: { state: Any ->
it.fastForEach { it(state) }
) {
override fun takeNestedSnapshot(readObserver: ((Any) -> Unit)?): Snapshot =
takeNewSnapshot { invalid ->
id = sync { nextSnapshotId++ },
invalid = invalid,
readObserver = readObserver
override fun takeNestedMutableSnapshot(
readObserver: ((Any) -> Unit)?,
writeObserver: ((Any) -> Unit)?
): MutableSnapshot = takeNewSnapshot { invalid ->
id = sync { nextSnapshotId++ },
invalid = invalid,
// It is intentional that the global read observers are not merged with mutable
// snapshots read observers.
readObserver = readObserver,
// It is intentional that global write observers are not merged with mutable
// snapshots write observers.
writeObserver = writeObserver
override fun notifyObjectsInitialized() {
override fun nestedDeactivated(snapshot: Snapshot) = unsupported()
override fun nestedActivated(snapshot: Snapshot) = unsupported()
override fun apply(): SnapshotApplyResult =
error("Cannot apply the global snapshot directly. Call Snapshot.advanceGlobalSnapshot")
override fun dispose() {
sync {
* A nested mutable snapshot created by [MutableSnapshot.takeNestedMutableSnapshot].
internal class NestedMutableSnapshot(
id: Int,
invalid: SnapshotIdSet,
readObserver: ((Any) -> Unit)?,
writeObserver: ((Any) -> Unit)?,
val parent: MutableSnapshot
) : MutableSnapshot(id, invalid, readObserver, writeObserver) {
private var deactivated = false
init { parent.nestedActivated(this) }
override val root: Snapshot get() = parent.root
override fun dispose() {
if (!disposed) {
override fun apply(): SnapshotApplyResult {
if (parent.applied || parent.disposed) return SnapshotApplyResult.Failure(this)
// Applying a nested mutable snapshot applies its changes to the parent snapshot.
// See MutableSnapshot.apply() for implantation notes.
// The apply observer notification are for applying to the global scope so it is elided
// here making this code a bit simpler than MutableSnapshot.apply.
val modified = modified
val id = id
val optimisticMerges = if (modified != null)
optimisticMerges(parent, this, parent.invalid)
else null
sync {
if (modified == null || modified.size == 0) {
} else {
val result = innerApplyLocked(, optimisticMerges, parent.invalid)
if (result != SnapshotApplyResult.Success) return result
// Add all modified objects in this set to the parent
parent.modified ?: IdentityArraySet<StateObject>().also {
parent.modified = it
// Ensure the parent is newer than the current snapshot
if ( < id) {
// Make the snapshot visible in the parent snapshot
parent.invalid = parent.invalid.clear(id).andNot(previousIds)
// Ensure the ids associated with this snapshot are also applied by the parent.
applied = true
return SnapshotApplyResult.Success
private fun deactivate() {
if (!deactivated) {
deactivated = true
* A pseudo snapshot that doesn't introduce isolation but does introduce observers.
internal class TransparentObserverMutableSnapshot(
private val parentSnapshot: MutableSnapshot?,
specifiedReadObserver: ((Any) -> Unit)?,
specifiedWriteObserver: ((Any) -> Unit)?,
private val mergeParentObservers: Boolean,
private val ownsParentSnapshot: Boolean
) : MutableSnapshot(
parentSnapshot?.readObserver ?: currentGlobalSnapshot.get().readObserver,
parentSnapshot?.writeObserver ?: currentGlobalSnapshot.get().writeObserver
) {
private val currentSnapshot: MutableSnapshot
get() = parentSnapshot ?: currentGlobalSnapshot.get()
override fun dispose() {
// Explicitly don't call super.dispose()
disposed = true
if (ownsParentSnapshot) {
override var id: Int
get() =
set(value) { unsupported() }
override var invalid get() = currentSnapshot.invalid
set(value) = unsupported()
override fun hasPendingChanges(): Boolean = currentSnapshot.hasPendingChanges()
override var modified: IdentityArraySet<StateObject>?
get() = currentSnapshot.modified
set(value) = unsupported()
override val readOnly: Boolean
get() = currentSnapshot.readOnly
override fun apply(): SnapshotApplyResult =
override fun recordModified(state: StateObject) =
override fun takeNestedSnapshot(readObserver: ((Any) -> Unit)?): Snapshot {
val mergedReadObserver = mergedReadObserver(readObserver, this.readObserver)
return if (!mergeParentObservers) {
previousSnapshot = currentSnapshot.takeNestedSnapshot(null),
readObserver = mergedReadObserver,
ownsPreviousSnapshot = true
} else {
override fun takeNestedMutableSnapshot(
readObserver: ((Any) -> Unit)?,
writeObserver: ((Any) -> Unit)?
): MutableSnapshot {
val mergedReadObserver = mergedReadObserver(readObserver, this.readObserver)
val mergedWriteObserver = mergedWriteObserver(writeObserver, this.writeObserver)
return if (!mergeParentObservers) {
val nestedSnapshot = currentSnapshot.takeNestedMutableSnapshot(
readObserver = null,
writeObserver = mergedWriteObserver
parentSnapshot = nestedSnapshot,
specifiedReadObserver = mergedReadObserver,
specifiedWriteObserver = mergedWriteObserver,
mergeParentObservers = false,
ownsParentSnapshot = true
} else {
override fun notifyObjectsInitialized() = currentSnapshot.notifyObjectsInitialized()
/** Should never be called. */
override fun nestedActivated(snapshot: Snapshot) = unsupported()
override fun nestedDeactivated(snapshot: Snapshot) = unsupported()
* A pseudo snapshot that doesn't introduce isolation but does introduce observers.
internal class TransparentObserverSnapshot(
private val previousSnapshot: Snapshot?,
specifiedReadObserver: ((Any) -> Unit)?,
private val mergeParentObservers: Boolean,
private val ownsPreviousSnapshot: Boolean
) : Snapshot(
) {
override val readObserver: ((Any) -> Unit)? = mergedReadObserver(
previousSnapshot?.readObserver ?: currentGlobalSnapshot.get().readObserver,
override val writeObserver: ((Any) -> Unit)? = null
override val root: Snapshot = this
private val currentSnapshot: Snapshot
get() = previousSnapshot ?: currentGlobalSnapshot.get()
override fun dispose() {
// Explicitly don't call super.dispose()
disposed = true
if (ownsPreviousSnapshot) {
override var id: Int
get() =
set(value) { unsupported() }
override var invalid get() = currentSnapshot.invalid
set(value) = unsupported()
override fun hasPendingChanges(): Boolean = currentSnapshot.hasPendingChanges()
override var modified: IdentityArraySet<StateObject>?
get() = currentSnapshot.modified
set(value) = unsupported()
override val readOnly: Boolean
get() = currentSnapshot.readOnly
override fun recordModified(state: StateObject) =
override fun takeNestedSnapshot(readObserver: ((Any) -> Unit)?): Snapshot {
val mergedReadObserver = mergedReadObserver(readObserver, this.readObserver)
return if (!mergeParentObservers) {
ownsPreviousSnapshot = true
} else {
override fun notifyObjectsInitialized() = currentSnapshot.notifyObjectsInitialized()
/** Should never be called. */
override fun nestedActivated(snapshot: Snapshot) = unsupported()
override fun nestedDeactivated(snapshot: Snapshot) = unsupported()
private fun createTransparentSnapshotWithNoParentReadObserver(
previousSnapshot: Snapshot?,
readObserver: ((Any) -> Unit)? = null,
ownsPreviousSnapshot: Boolean = false
): Snapshot = if (previousSnapshot is MutableSnapshot || previousSnapshot == null) {
parentSnapshot = previousSnapshot as? MutableSnapshot,
specifiedReadObserver = readObserver,
specifiedWriteObserver = null,
mergeParentObservers = false,
ownsParentSnapshot = ownsPreviousSnapshot
} else {
previousSnapshot = previousSnapshot,
specifiedReadObserver = readObserver,
mergeParentObservers = false,
ownsPreviousSnapshot = ownsPreviousSnapshot
private fun mergedReadObserver(
readObserver: ((Any) -> Unit)?,
parentObserver: ((Any) -> Unit)?,
mergeReadObserver: Boolean = true
): ((Any) -> Unit)? {
val parentObserver = if (mergeReadObserver) parentObserver else null
return if (readObserver != null && parentObserver != null && readObserver != parentObserver) {
{ state: Any ->
} else readObserver ?: parentObserver
private fun mergedWriteObserver(
writeObserver: ((Any) -> Unit)?,
parentObserver: ((Any) -> Unit)?
): ((Any) -> Unit)? =
if (writeObserver != null && parentObserver != null && writeObserver != parentObserver) {
{ state: Any ->
} else writeObserver ?: parentObserver
* Snapshot id of `0` is reserved as invalid and no state record with snapshot `0` is considered
* valid.
* The value `0` was chosen as it is the default value of the Int snapshot id type and records
* initially created will naturally have a snapshot id of 0. If this wasn't considered invalid
* adding such a record to a state object will make the state record immediately visible to the
* snapshots instead of being born invalid. Using `0` ensures all state records are created invalid
* and must be explicitly marked as valid in to be visible in a snapshot.
private const val INVALID_SNAPSHOT = 0
* Current thread snapshot
private val threadSnapshot = SnapshotThreadLocal<Snapshot>()
* A global synchronization object. This synchronization object should be taken before modifying any
* of the fields below.
internal val lock = Any()
internal inline fun <T> sync(block: () -> T): T = synchronized(lock, block)
// The following variables should only be written when sync is taken
* A set of snapshots that are currently open and should be considered invalid for new snapshots.
private var openSnapshots = SnapshotIdSet.EMPTY
/** The first snapshot created must be at least on more than the INVALID_SNAPSHOT */
private var nextSnapshotId = INVALID_SNAPSHOT + 1
* A tracking table for pinned snapshots. A pinned snapshot is the lowest snapshot id that the
* snapshot is ignoring by considering them invalid. This is used to calculate when a snapshot
* record can be reused.
private val pinningTable = SnapshotDoubleIndexHeap()
* The set of objects who have more than one active state record. These are traversed during
* apply of mutable snapshots and when the global snapshot is advanced to determine if any of the
* records can be cleared.
private val extraStateObjects = SnapshotWeakSet<StateObject>()
/** A list of apply observers */
private val applyObservers = mutableListOf<(Set<Any>, Snapshot) -> Unit>()
/** A list of observers of writes to the global state. */
private val globalWriteObservers = mutableListOf<((Any) -> Unit)>()
private val currentGlobalSnapshot = AtomicReference(
id = nextSnapshotId++,
invalid = SnapshotIdSet.EMPTY
).also {
openSnapshots = openSnapshots.set(
* A value to use to initialize the snapshot local variable of writable below. The value of this
* doesn't matter as it is just used to initialize the local that is immediately overwritten by
* Snapshot.current. This is done to avoid a compiler error complaining that the var has not been
* initialized. This can be removed once contracts are out of experimental; then we can mark sync
* with the correct contracts so the compiler would be able to figure out that the variable is
* initialized.
internal val snapshotInitializer: Snapshot = currentGlobalSnapshot.get()
private fun <T> takeNewGlobalSnapshot(
previousGlobalSnapshot: Snapshot,
block: (invalid: SnapshotIdSet) -> T
): T {
// Deactivate global snapshot. It is safe to just deactivate it because it cannot have
// any conflicting writes as it is always closed before another snapshot is taken.
val result = block(openSnapshots.clear(
sync {
val globalId = nextSnapshotId++
openSnapshots = openSnapshots.clear(
id = globalId,
invalid = openSnapshots
openSnapshots = openSnapshots.set(globalId)
return result
private fun <T> advanceGlobalSnapshot(block: (invalid: SnapshotIdSet) -> T): T {
var previousGlobalSnapshot = snapshotInitializer as GlobalSnapshot
val result = sync {
previousGlobalSnapshot = currentGlobalSnapshot.get()
takeNewGlobalSnapshot(previousGlobalSnapshot, block)
// If the previous global snapshot had any modified states then notify the registered apply
// observers.
val modified = previousGlobalSnapshot.modified
if (modified != null) {
val observers: List<(Set<Any>, Snapshot) -> Unit> = sync { applyObservers.toMutableList() }
observers.fastForEach { observer ->
observer(modified, previousGlobalSnapshot)
sync {
modified?.fastForEach { processForUnusedRecordsLocked(it) }
return result
private fun advanceGlobalSnapshot() = advanceGlobalSnapshot { }
private fun <T : Snapshot> takeNewSnapshot(block: (invalid: SnapshotIdSet) -> T): T =
advanceGlobalSnapshot { invalid ->
val result = block(invalid)
sync {
openSnapshots = openSnapshots.set(
private fun validateOpen(snapshot: Snapshot) {
if (!openSnapshots.get( error("Snapshot is not open")
* A candidate snapshot is valid if the it is less than or equal to the current snapshot
* and it wasn't specifically marked as invalid when the snapshot started.
* All snapshot active at when the snapshot was taken considered invalid for the snapshot
* (they have not been applied and therefore are considered invalid).
* All snapshots taken after the current snapshot are considered invalid since they where taken
* after the current snapshot was taken.
* INVALID_SNAPSHOT is reserved as an invalid snapshot id.
private fun valid(currentSnapshot: Int, candidateSnapshot: Int, invalid: SnapshotIdSet): Boolean {
return candidateSnapshot != INVALID_SNAPSHOT && candidateSnapshot <= currentSnapshot &&
// Determine if the given data is valid for the snapshot.
private fun valid(data: StateRecord, snapshot: Int, invalid: SnapshotIdSet): Boolean {
return valid(snapshot, data.snapshotId, invalid)
private fun <T : StateRecord> readable(r: T, id: Int, invalid: SnapshotIdSet): T? {
// The readable record is the valid record with the highest snapshotId
var current: StateRecord? = r
var candidate: StateRecord? = null
while (current != null) {
if (valid(current, id, invalid)) {
candidate = if (candidate == null) current
else if (candidate.snapshotId < current.snapshotId) current else candidate
current =
if (candidate != null) {
return candidate as T
return null
* Return the current readable state record for the current snapshot. It is assumed that [this]
* is the first record of [state]
fun <T : StateRecord> T.readable(state: StateObject): T {
val snapshot = Snapshot.current
return readable(this,, snapshot.invalid) ?: sync {
// Readable can return null when the global snapshot has been advanced by another thread
// and state written to the object was overwritten while this thread was paused. Repeating
// the read is valid here as either this will return the same result as the previous call
// or will find a valid record. Being in a sync block prevents other threads from writing
// to this state object until the read completes.
val syncSnapshot = Snapshot.current
readable(state.firstStateRecord as T,, syncSnapshot.invalid) ?: readError()
* Return the current readable state record for the [snapshot]. It is assumed that [this]
* is the first record of [state]
fun <T : StateRecord> T.readable(state: StateObject, snapshot: Snapshot): T {
// invoke the observer associated with the current snapshot.
return readable(this,, snapshot.invalid) ?: readError()
private fun readError(): Nothing {
"Reading a state that was created after the snapshot was taken or in a snapshot that " +
"has not yet been applied"
* A record can be reused if no other snapshot will see it as valid. This is always true for a
* record created in an abandoned snapshot. It is also true if the record is valid in the
* previous snapshot and is obscured by another record also valid in the previous state record.
private fun usedLocked(state: StateObject): StateRecord? {
var current: StateRecord? = state.firstStateRecord
var validRecord: StateRecord? = null
val reuseLimit = pinningTable.lowestOrDefault(nextSnapshotId) - 1
val invalid = SnapshotIdSet.EMPTY
while (current != null) {
val currentId = current.snapshotId
if (currentId == INVALID_SNAPSHOT) {
// Any records that were marked invalid by an abandoned snapshot or is marked reachable
// can be used immediately.
return current
if (valid(current, reuseLimit, invalid)) {
if (validRecord == null) {
validRecord = current
} else {
// If we have two valid records one must obscure the other. Return the
// record with the lowest id
return if (current.snapshotId < validRecord.snapshotId) current else validRecord
current =
return null
* Clear records that cannot be selected in any currently open snapshot.
* This method uses the same technique as [usedLocked] which uses the [pinningTable] to
* determine lowest id in the invalid set for all snapshots. Only the record with the greatest
* id of all records less or equal to this lowest id can possibly be selected in any snapshot
* and all other records below that number can be overwritten.
* However, this technique doesn't find all records that will not be selected by any open snapshot
* as a record that has an id above that number could be reusable but will not be found.
* For example if snapshot 1 is open and 2 is created and modifies [state] then is applied, 3 is
* open and then 4 is open, and then 1 is applied. When 3 modifies [state] and then applies, as 1 is
* pinned by 4, it is uncertain whether the record for 2 is needed by 4 so it must be kept even if 4
* also modified [state] and would not select 2. Accurately determine if a record is selectable
* would require keeping a list of all open [Snapshot] instances which currently is not kept and
* traversing that list for each record.
* If any such records are possible this method returns true. In other words, this method returns
* true if any records might be reusable but this function could not prove there were or not.
private fun overwriteUnusedRecordsLocked(state: StateObject): Boolean {
var current: StateRecord? = state.firstStateRecord
var overwriteRecord: StateRecord? = null
var validRecord: StateRecord? = null
val reuseLimit = pinningTable.lowestOrDefault(nextSnapshotId)
var retainedRecords = 0
while (current != null) {
val currentId = current.snapshotId
if (currentId != INVALID_SNAPSHOT) {
if (currentId < reuseLimit) {
if (validRecord == null) {
// If any records are below [reuseLimit] then we must keep the highest one
// so the lowest snapshot can select it.
validRecord = current
} else {
// If [validRecord] is from an earlier snapshot, overwrite it instead
val recordToOverwrite = if (current.snapshotId < validRecord.snapshotId) {
} else {
// We cannot use `.also { }` here as it prevents smart casting of other
// uses of [validRecord].
val result = validRecord
validRecord = current
if (overwriteRecord == null) {
// Find a record we will definitely keep
overwriteRecord = state.firstStateRecord.findYoungestOr {
it.snapshotId >= reuseLimit
recordToOverwrite.snapshotId = INVALID_SNAPSHOT
} else {
current =
return retainedRecords > 1
private inline fun StateRecord.findYoungestOr(predicate: (StateRecord) -> Boolean): StateRecord {
var current: StateRecord? = this
var youngest = this
while (current != null) {
if (predicate(current)) return current
if (youngest.snapshotId < current.snapshotId)
youngest = current
current =
return youngest
private fun checkAndOverwriteUnusedRecordsLocked() {
extraStateObjects.removeIf { !overwriteUnusedRecordsLocked(it) }
private fun processForUnusedRecordsLocked(state: StateObject) {
if (overwriteUnusedRecordsLocked(state)) {
internal fun <T : StateRecord> T.writableRecord(state: StateObject, snapshot: Snapshot): T {
if (snapshot.readOnly) {
// If the snapshot is read-only, use the snapshot recordModified to report it.
val id =
val readData = readable(this, id, snapshot.invalid) ?: readError()
// If the readable data was born in this snapshot, it is writable.
if (readData.snapshotId == return readData
// Otherwise, make a copy of the readable data and mark it as born in this snapshot, making it
// writable.
val newData = readData.newWritableRecord(state, snapshot)
return newData
internal fun <T : StateRecord> T.overwritableRecord(
state: StateObject,
snapshot: Snapshot,
candidate: T
): T {
if (snapshot.readOnly) {
// If the snapshot is read-only, use the snapshot recordModified to report it.
val id =
if (candidate.snapshotId == id) return candidate
val newData = sync { newOverwritableRecordLocked(state) }
newData.snapshotId = id
return newData
internal fun <T : StateRecord> T.newWritableRecord(state: StateObject, snapshot: Snapshot) =
sync { newWritableRecordLocked(state, snapshot) }
private fun <T : StateRecord> T.newWritableRecordLocked(state: StateObject, snapshot: Snapshot): T {
// Calling used() on a state object might return the same record for each thread calling
// used() therefore selecting the record to reuse should be guarded.
// Note: setting the snapshotId to Int.MAX_VALUE will make it invalid for all snapshots.
// This means the lock can be released as used() will no longer select it. Using id could
// also be used but it puts the object into a state where the reused value appears to be
// the current valid value for the snapshot. This is not an issue if the snapshot is only
// being read from a single thread but using Int.MAX_VALUE allows multiple readers,
// single writer, of a snapshot. Note that threads reading a mutating snapshot should not
// cache the result of readable() as the mutating thread calls to writable() can change the
// result of readable().
val newData = newOverwritableRecordLocked(state)
newData.snapshotId =
return newData
internal fun <T : StateRecord> T.newOverwritableRecordLocked(state: StateObject): T {
// Calling used() on a state object might return the same record for each thread calling
// used() therefore selecting the record to reuse should be guarded.
// Note: setting the snapshotId to Int.MAX_VALUE will make it invalid for all snapshots.
// This means the lock can be released as used() will no longer select it. Using id could
// also be used but it puts the object into a state where the reused value appears to be
// the current valid value for the snapshot. This is not an issue if the snapshot is only
// being read from a single thread but using Int.MAX_VALUE allows multiple readers,
// single writer, of a snapshot. Note that threads reading a mutating snapshot should not
// cache the result of readable() as the mutating thread calls to writable() can change the
// result of readable().
return (usedLocked(state) as T?)?.apply {
snapshotId = Int.MAX_VALUE
} ?: create().apply {
snapshotId = Int.MAX_VALUE = state.firstStateRecord
state.prependStateRecord(this as T)
} as T
internal fun notifyWrite(snapshot: Snapshot, state: StateObject) {
* Call [block] with a writable state record for [snapshot] of the given record. It is
* assumed that this is called for the first state record in a state object. If the snapshot is
* read-only calling this will throw.
inline fun <T : StateRecord, R> T.writable(
state: StateObject,
snapshot: Snapshot,
block: T.() -> R
): R {
// A writable record will always be the readable record (as all newer records are invalid it
// must be the newest valid record). This means that if the readable record is not from the
// current snapshot, a new record must be created. To create a new writable record, a record
// can be reused, if possible, and the readable record is applied to it. If a record cannot
// be reused, a new record is created and the readable record is applied to it. Once the
// values are correct the record is made live by giving it the current snapshot id.
// Writes need to be in a `sync` block as all writes in flight must be completed before a new
// snapshot is take. Writing in a sync block ensures this is the case because new snapshots
// are also in a sync block.
return sync {
this.writableRecord(state, snapshot).block()
}.also { notifyWrite(snapshot, state) }
* Call [block] with a writable state record for the given record. It is assumed that this is
* called for the first state record in a state object. A record is writable if it was created in
* the current mutable snapshot.
inline fun <T : StateRecord, R> T.writable(state: StateObject, block: T.() -> R): R {
var snapshot: Snapshot = snapshotInitializer
return sync {
snapshot = Snapshot.current
this.writableRecord(state, snapshot).block()
}.also {
notifyWrite(snapshot, state)
* Call [block] with a writable state record for the given record. It is assumed that this is
* called for the first state record in a state object. A record is writable if it was created in
* the current mutable snapshot. This should only be used when the record will be overwritten in
* its entirety (such as having only one field and that field is written to).
* WARNING: If the caller doesn't overwrite all the fields in the state record the object will be
* inconsistent and the fields not written are almost guaranteed to be incorrect. If it is
* possible that [block] will not write to all the fields use [writable] instead.
* @param state The object that has this record in its record list.
* @param candidate The current for the snapshot record returned by [withCurrent]
* @param block The block that will mutate all the field of the record.
internal inline fun <T : StateRecord, R> T.overwritable(
state: StateObject,
candidate: T,
block: T.() -> R
): R {
var snapshot: Snapshot = snapshotInitializer
return sync {
snapshot = Snapshot.current
this.overwritableRecord(state, snapshot, candidate).block()
}.also {
notifyWrite(snapshot, state)
* Produce a set of optimistic merges of the state records, this is performed outside the
* a synchronization block to reduce the amount of time taken in the synchronization block
* reducing the thread contention of merging state values.
private fun optimisticMerges(
currentSnapshot: MutableSnapshot,
applyingSnapshot: MutableSnapshot,
invalidSnapshots: SnapshotIdSet
): Map<StateRecord, StateRecord>? {
val modified = applyingSnapshot.modified
val id =
if (modified == null) return null
val start = applyingSnapshot.invalid.set(
var result: MutableMap<StateRecord, StateRecord>? = null
for (state in modified) {
val first = state.firstStateRecord
val current = readable(first, id, invalidSnapshots) ?: continue
val previous = readable(first, id, start) ?: continue
if (current != previous) {
// Try to produce a merged state record
val applied = readable(first,, applyingSnapshot.invalid)
?: readError()
val merged = state.mergeRecords(previous, current, applied)
if (merged != null) {
result ?: hashMapOf<StateRecord, StateRecord>().also {
result = it
)[current] = merged
} else {
// If one fails don't bother calculating the others as they are likely not going
// to be used. There is an unlikely case that a optimistic merge cannot be
// produced but the snapshot will apply because, once the synchronization is taken,
// the current state can be merge. This routine errors on the side of reduced
// overall work by not performing work that is likely to be ignored.
return null
return result
private fun reportReadonlySnapshotWrite(): Nothing {
error("Cannot modify a state object in a read-only snapshot")
* Returns the current record without notifying any read observers.
internal fun <T : StateRecord> current(r: T, snapshot: Snapshot) =
readable(r,, snapshot.invalid) ?: readError()
internal fun <T : StateRecord> current(r: T) =
Snapshot.current.let { snapshot ->
readable(r,, snapshot.invalid) ?: sync {
Snapshot.current.let { syncSnapshot ->
readable(r,, syncSnapshot.invalid)
} ?: readError()
* Provides a [block] with the current record, without notifying any read observers.
* @see readable
inline fun <T : StateRecord, R> T.withCurrent(block: (r: T) -> R): R =
* Helper routine to add a range of values ot a snapshot set
internal fun SnapshotIdSet.addRange(from: Int, until: Int): SnapshotIdSet {
var result = this
for (invalidId in from until until)
result = result.set(invalidId)
return result