blob: 275650a4dd1ee5763bc298ec3297c040a4a35765 [file] [log] [blame]
/*
* Copyright 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.datastore.core
import androidx.annotation.GuardedBy
import java.io.File
import java.io.FileInputStream
import java.io.FileNotFoundException
import java.io.FileOutputStream
import java.io.IOException
import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
/**
* The Java IO File version of the Storage<T> interface. Is able to read and write [T] to a given
* file location.
*
* @param serializer The serializer that can write [T] to and from a byte array.
* @param coordinatorProducer The producer to provide [InterProcessCoordinator] that coordinates IO
* operations across processes if needed. By default it provides single process coordinator, which
* doesn't support cross process use cases.
* @param produceFile The file producer that returns the file that will be read and written.
*/
class FileStorage<T>(
private val serializer: Serializer<T>,
private val coordinatorProducer: (File) -> InterProcessCoordinator = {
createSingleProcessCoordinator(it)
},
private val produceFile: () -> File
) : Storage<T> {
override fun createConnection(): StorageConnection<T> {
val file = produceFile().canonicalFile
synchronized(activeFilesLock) {
val path = file.absolutePath
check(!activeFiles.contains(path)) {
"There are multiple DataStores active for the same file: $path. You should " +
"either maintain your DataStore as a singleton or confirm that there is " +
"no two DataStore's active on the same file (by confirming that the scope" +
" is cancelled)."
}
activeFiles.add(path)
}
return FileStorageConnection(file, serializer, coordinatorProducer(file)) {
synchronized(activeFilesLock) {
activeFiles.remove(file.absolutePath)
}
}
}
internal companion object {
/**
* Active files should contain the absolute path for which there are currently active
* DataStores. A DataStore is active until the scope it was created with has been
* cancelled. Files aren't added to this list until the first read/write because the file
* path is computed asynchronously.
*/
@GuardedBy("activeFilesLock")
internal val activeFiles = mutableSetOf<String>()
internal val activeFilesLock = Any()
}
}
internal class FileStorageConnection<T>(
private val file: File,
private val serializer: Serializer<T>,
override val coordinator: InterProcessCoordinator,
private val onClose: () -> Unit
) : StorageConnection<T> {
private val closed = AtomicBoolean(false)
// TODO:(b/233402915) support multiple readers
private val transactionMutex = Mutex()
override suspend fun <R> readScope(
block: suspend ReadScope<T>.(locked: Boolean) -> R
): R {
checkNotClosed()
val lock = transactionMutex.tryLock()
try {
return FileReadScope(file, serializer).use {
block(it, lock)
}
} finally {
if (lock) {
transactionMutex.unlock()
}
}
}
override suspend fun writeScope(block: suspend WriteScope<T>.() -> Unit) {
checkNotClosed()
file.createParentDirectories()
transactionMutex.withLock {
val scratchFile = File(file.absolutePath + ".tmp")
try {
FileWriteScope(scratchFile, serializer).use {
block(it)
}
if (scratchFile.exists() && !scratchFile.renameTo(file)) {
throw IOException(
"Unable to rename $scratchFile. " +
"This likely means that there are multiple instances of DataStore " +
"for this file. Ensure that you are only creating a single instance of " +
"datastore for this file."
)
}
} catch (ex: IOException) {
if (scratchFile.exists()) {
scratchFile.delete() // Swallow failure to delete
}
throw ex
}
}
}
public override fun close() {
closed.set(true)
onClose()
}
private fun checkNotClosed() {
check(!closed.get()) { "StorageConnection has already been disposed." }
}
private fun File.createParentDirectories() {
val parent: File? = canonicalFile.parentFile
parent?.let {
it.mkdirs()
if (!it.isDirectory) {
throw IOException("Unable to create parent directories of $this")
}
}
}
}
internal open class FileReadScope<T>(
protected val file: File,
protected val serializer: Serializer<T>
) : ReadScope<T> {
private val closed = AtomicBoolean(false)
override suspend fun readData(): T {
checkNotClosed()
return try {
FileInputStream(file).use { stream ->
serializer.readFrom(stream)
}
} catch (ex: FileNotFoundException) {
if (file.exists()) {
// Re-read to prevent throwing from a race condition where the file is created by
// another process after the initial read attempt but before `file.exists()` is
// called. Otherwise file exists but we can't read it; throw FileNotFoundException
// because something is wrong.
return FileInputStream(file).use { stream ->
serializer.readFrom(stream)
}
}
return serializer.defaultValue
}
}
override fun close() {
closed.set(true)
}
protected fun checkNotClosed() {
check(!closed.get()) { "This scope has already been closed." }
}
}
internal class FileWriteScope<T>(file: File, serializer: Serializer<T>) :
FileReadScope<T>(file, serializer), WriteScope<T> {
override suspend fun writeData(value: T) {
checkNotClosed()
val fos = FileOutputStream(file)
fos.use { stream ->
serializer.writeTo(value, UncloseableOutputStream(stream))
stream.fd.sync()
// TODO(b/151635324): fsync the directory, otherwise a badly timed crash could
// result in reverting to a previous state.
}
}
}