blob: 3d3edffbf0ae577e850fc682db1f7b348a520a09 [file] [log] [blame]
Zhiyuan Wang7a2c7192023-01-25 05:28:00 -08001/*
2 * Copyright 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Alan Viverettedb89add2023-11-21 16:29:40 -050017@file:JvmName("MultiProcessCoordinatorKt") // Workaround for b/313964643
18
Zhiyuan Wang7a2c7192023-01-25 05:28:00 -080019package androidx.datastore.core
20
Zhiyuan Wang7a2c7192023-01-25 05:28:00 -080021import java.io.File
22import java.io.FileInputStream
23import java.io.FileOutputStream
24import java.io.IOException
25import java.nio.channels.FileLock
Alan Viverettee9809c62023-07-19 14:42:11 +000026import kotlin.contracts.ExperimentalContracts
Zhiyuan Wang7a2c7192023-01-25 05:28:00 -080027import kotlin.coroutines.CoroutineContext
Zhiyuan Wang7a2c7192023-01-25 05:28:00 -080028import kotlinx.coroutines.flow.Flow
Zhiyuan Wang7a2c7192023-01-25 05:28:00 -080029import kotlinx.coroutines.sync.Mutex
30import kotlinx.coroutines.sync.withLock
31import kotlinx.coroutines.withContext
32
33internal class MultiProcessCoordinator(
34 private val context: CoroutineContext,
35 protected val file: File
36) : InterProcessCoordinator {
37 // TODO(b/269375542): the flow should `flowOn` the provided [context]
Yigit Boyare9b8e292023-08-10 08:53:13 -070038 override val updateNotifications: Flow<Unit> = MulticastFileObserver.observe(file)
Zhiyuan Wang7a2c7192023-01-25 05:28:00 -080039
40 // run block with the exclusive lock
41 override suspend fun <T> lock(block: suspend () -> T): T {
42 inMemoryMutex.withLock {
43 FileOutputStream(lockFile).use { lockFileStream ->
44 var lock: FileLock? = null
45 try {
46 lock = lockFileStream.getChannel().lock(0L, Long.MAX_VALUE, /* shared= */ false)
47 return block()
48 } finally {
49 lock?.release()
50 }
51 }
52 }
53 }
54
55 // run block with an attempt to get the exclusive lock, still run even if
56 // attempt fails. Pass a boolean to indicate if the attempt succeeds.
Alan Viverettee9809c62023-07-19 14:42:11 +000057 @OptIn(ExperimentalContracts::class) // withTryLock
Zhiyuan Wang7a2c7192023-01-25 05:28:00 -080058 override suspend fun <T> tryLock(block: suspend (Boolean) -> T): T {
59 inMemoryMutex.withTryLock<T> {
60 if (it == false) {
61 return block(false)
62 }
63 FileInputStream(lockFile).use { lockFileStream ->
64 var lock: FileLock? = null
65 try {
66 try {
67 lock = lockFileStream.getChannel().tryLock(
68 /* position= */ 0L,
69 /* size= */ Long.MAX_VALUE,
70 /* shared= */ true
71 )
72 } catch (ex: IOException) {
73 // TODO(b/255419657): Update the shared lock IOException handling logic for
74 // KMM.
75
76 // Some platforms / OS do not support shared lock and convert shared lock
77 // requests to exclusive lock requests. If the lock can't be acquired, it
78 // will throw an IOException with EAGAIN error, instead of returning null as
79 // specified in {@link FileChannel#tryLock}. We only continue if the error
80 // message is EAGAIN, otherwise just throw it.
81 if (ex.message?.startsWith(LOCK_ERROR_MESSAGE) != true) {
82 throw ex
83 }
84 }
85 return block(lock != null)
86 } finally {
87 lock?.release()
88 }
89 }
90 }
91 }
92
93 // get the current version
94 override suspend fun getVersion(): Int {
95 // Only switch coroutine if sharedCounter is not initialized because initialization incurs
96 // disk IO
97 return withLazyCounter { it.getValue() }
98 }
99
100 // increment version and return the new one
101 override suspend fun incrementAndGetVersion(): Int {
102 // Only switch coroutine if sharedCounter is not initialized because initialization incurs
103 // disk IO
104 return withLazyCounter { it.incrementAndGetValue() }
105 }
106
107 private val LOCK_SUFFIX = ".lock"
108 private val VERSION_SUFFIX = ".version"
109 private val LOCK_ERROR_MESSAGE = "fcntl failed: EAGAIN"
110
111 private val inMemoryMutex = Mutex()
112 private val lockFile: File by lazy {
113 val lockFile = fileWithSuffix(LOCK_SUFFIX)
114 lockFile.createIfNotExists()
115 lockFile
116 }
117
118 private val lazySharedCounter = lazy {
119 SharedCounter.loadLib()
120 SharedCounter.create {
121 val versionFile = fileWithSuffix(VERSION_SUFFIX)
122 versionFile.createIfNotExists()
123 versionFile
124 }
125 }
126 private val sharedCounter by lazySharedCounter
127
128 private fun fileWithSuffix(suffix: String): File {
129 return File(file.absolutePath + suffix)
130 }
131
132 private fun File.createIfNotExists() {
133 createParentDirectories()
134 if (!exists()) {
135 createNewFile()
136 }
137 }
138
139 private fun File.createParentDirectories() {
140 val parent: File? = canonicalFile.parentFile
141
142 parent?.let {
143 it.mkdirs()
144 if (!it.isDirectory) {
145 throw IOException("Unable to create parent directories of $this")
146 }
147 }
148 }
149
150 /**
151 * {@link SharedCounter} needs to be initialized in a separate coroutine so it does not violate
152 * StrictMode policy in the main thread.
153 */
154 private suspend inline fun <T> withLazyCounter(
155 crossinline block: suspend (SharedCounter) -> T
156 ): T {
157 return if (lazySharedCounter.isInitialized()) {
158 block(sharedCounter)
159 } else {
160 withContext(context) {
161 block(sharedCounter)
162 }
163 }
164 }
165}
166
167/**
168 * Create a coordinator for multiple process use cases.
169 *
170 * @param context the coroutine context to be used by the [MultiProcessCoordinator] for IO
171 * operations.
172 * @param file the File in which [DataStore] stores the data.
173 */
174@Suppress("StreamFiles")
175fun createMultiProcessCoordinator(context: CoroutineContext, file: File): InterProcessCoordinator =
Aurimas Liutikas4d534002023-07-06 15:51:33 -0700176 MultiProcessCoordinator(context, file)