blob: a8a05834456676fffee17b286d9f46172653ee68 [file] [log] [blame]
rohitsat1303fed862021-01-13 10:21:22 -08001/*
2 * Copyright 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package androidx.datastore.core
18
Dustin Lam06715062022-05-23 15:27:12 -070019import androidx.kruth.assertThat
20import java.io.DataInputStream
21import java.io.DataOutputStream
22import java.io.IOException
23import java.io.InputStream
24import java.io.OutputStream
Dustin Lam06715062022-05-23 15:27:12 -070025import java.util.concurrent.TimeUnit
Dustin Lam06715062022-05-23 15:27:12 -070026import kotlin.test.Test
Jim Sprochef5fb012022-12-06 15:28:12 -080027import kotlin.time.Duration.Companion.seconds
rohitsat1303fed862021-01-13 10:21:22 -080028import kotlinx.coroutines.CoroutineScope
Yigit Boyar499b6452023-08-21 12:14:05 -070029import kotlinx.coroutines.Dispatchers
rohitsat1303fed862021-01-13 10:21:22 -080030import kotlinx.coroutines.Job
rohitsat1303fed862021-01-13 10:21:22 -080031import kotlinx.coroutines.async
32import kotlinx.coroutines.awaitAll
Yigit Boyar499b6452023-08-21 12:14:05 -070033import kotlinx.coroutines.cancel
rohitsat1303fed862021-01-13 10:21:22 -080034import kotlinx.coroutines.delay
Yigit Boyar499b6452023-08-21 12:14:05 -070035import kotlinx.coroutines.flow.Flow
36import kotlinx.coroutines.flow.collectIndexed
37import kotlinx.coroutines.flow.first
rohitsat1303fed862021-01-13 10:21:22 -080038import kotlinx.coroutines.flow.reduce
rohitsat1303fed862021-01-13 10:21:22 -080039import kotlinx.coroutines.flow.takeWhile
rohitsat1303fed862021-01-13 10:21:22 -080040import kotlinx.coroutines.runBlocking
rohitsat132baf1a02021-01-28 13:05:03 -080041import kotlinx.coroutines.withTimeout
Yigit Boyar499b6452023-08-21 12:14:05 -070042import org.junit.After
rohitsat1303fed862021-01-13 10:21:22 -080043import org.junit.Rule
rohitsat1303fed862021-01-13 10:21:22 -080044import org.junit.rules.TemporaryFolder
rohitsat133d5fe182021-01-27 16:00:59 -080045import org.junit.rules.Timeout
Yigit Boyar499b6452023-08-21 12:14:05 -070046import org.junit.runner.RunWith
47import org.junit.runners.JUnit4
rohitsat1303fed862021-01-13 10:21:22 -080048
Yigit Boyar499b6452023-08-21 12:14:05 -070049@RunWith(JUnit4::class)
rohitsat1303fed862021-01-13 10:21:22 -080050class SingleProcessDataStoreStressTest {
51 @get:Rule
52 val tempFolder = TemporaryFolder()
53
rohitsat133d5fe182021-01-27 16:00:59 -080054 @get:Rule
55 val timeout = Timeout(4, TimeUnit.MINUTES)
56
Yigit Boyar499b6452023-08-21 12:14:05 -070057 // using real dispatchers here to test parallelism
58 private val testScope = CoroutineScope(Job() + Dispatchers.IO)
rohitsat1303fed862021-01-13 10:21:22 -080059
Yigit Boyar499b6452023-08-21 12:14:05 -070060 @After
61 fun cancelTestScope() {
62 testScope.cancel()
63 }
64
65 @Test
66 fun testManyConcurrentReadsAndWrites() = runBlocking<Unit> {
rohitsat1303fed862021-01-13 10:21:22 -080067 val file = tempFolder.newFile()
68 file.delete()
69
70 val dataStore = DataStoreFactory.create(
71 serializer = LongSerializer(failWrites = false, failReads = false),
Yigit Boyar499b6452023-08-21 12:14:05 -070072 scope = testScope
rohitsat1303fed862021-01-13 10:21:22 -080073 ) { file }
Yigit Boyar499b6452023-08-21 12:14:05 -070074 assertThat(dataStore.data.first()).isEqualTo(0)
rohitsat1303fed862021-01-13 10:21:22 -080075
Yigit Boyar499b6452023-08-21 12:14:05 -070076 val readers = (0 until READER_COUNT).map {
77 testScope.async {
rohitsat1303fed862021-01-13 10:21:22 -080078 dataStore.data.takeWhile {
Yigit Boyar499b6452023-08-21 12:14:05 -070079 it < FINAL_TEST_VALUE
80 }.assertIncreasingAfterFirstRead()
rohitsat1303fed862021-01-13 10:21:22 -080081 }
82 }
Yigit Boyar499b6452023-08-21 12:14:05 -070083 val writers = (0 until WRITER_COUNT).map {
84 testScope.async {
85 repeat(UPDATES_PER_WRITER) {
rohitsat1303fed862021-01-13 10:21:22 -080086 dataStore.updateData {
87 it.inc()
88 }
89 }
90 }
91 }
92
rohitsat132baf1a02021-01-28 13:05:03 -080093 // There's no reason this should take more than a few seconds once writers complete and
94 // there's no reason writers won't complete.
Steven Schäferb7520772021-05-25 17:48:51 +020095 withTimeout(10.seconds) {
Yigit Boyar499b6452023-08-21 12:14:05 -070096 (writers + readers).awaitAll()
rohitsat132baf1a02021-01-28 13:05:03 -080097 }
rohitsat1303fed862021-01-13 10:21:22 -080098 }
99
100 @Test
101 fun testManyConcurrentReadsAndWrites_withIntermittentWriteFailures() = runBlocking<Unit> {
rohitsat1303fed862021-01-13 10:21:22 -0800102 val file = tempFolder.newFile()
103 file.delete()
104
Yigit Boyar499b6452023-08-21 12:14:05 -0700105 val serializer = LongSerializer(failWrites = false, failReads = false)
rohitsat1303fed862021-01-13 10:21:22 -0800106
107 val dataStore = DataStoreFactory.create(
108 serializer = serializer,
Yigit Boyar499b6452023-08-21 12:14:05 -0700109 scope = testScope
rohitsat1303fed862021-01-13 10:21:22 -0800110 ) { file }
111
Yigit Boyar499b6452023-08-21 12:14:05 -0700112 val readers = (0 until READER_COUNT).map {
113 testScope.async {
rohitsat1303fed862021-01-13 10:21:22 -0800114 dataStore.data.takeWhile {
Yigit Boyar499b6452023-08-21 12:14:05 -0700115 it < FINAL_TEST_VALUE
116 }.reduce { accumulator, value ->
117 // we don't use `assertIncreasingAfterFirstRead` here because failed writes
118 // might increment the shared counter and trigger more reads than necessary.
119 // Hence, we only assert for ">=" here.
120 assertThat(value).isAtLeast(accumulator)
rohitsat1303fed862021-01-13 10:21:22 -0800121 value
122 }
123 }
124 }
Yigit Boyar499b6452023-08-21 12:14:05 -0700125 val writers = (0 until WRITER_COUNT).map {
126 testScope.async {
127 repeat(UPDATES_PER_WRITER) {
rohitsat1303fed862021-01-13 10:21:22 -0800128 var success = false
129 while (!success) {
130 try {
131 dataStore.updateData { it.inc() }
132 success = true
Yigit Boyar499b6452023-08-21 12:14:05 -0700133 } catch (_: IOException) {
rohitsat1303fed862021-01-13 10:21:22 -0800134 }
135 }
136 }
137 }
138 }
139
rohitsat132baf1a02021-01-28 13:05:03 -0800140 serializer.failWrites = true
141
142 repeat(10) {
143 delay(10)
144 serializer.failWrites = !serializer.failWrites
rohitsat1303fed862021-01-13 10:21:22 -0800145 }
146
rohitsat132baf1a02021-01-28 13:05:03 -0800147 serializer.failWrites = false
148
rohitsat132baf1a02021-01-28 13:05:03 -0800149 // There's no reason this should take more than a few seconds once writers complete and
150 // there's no reason writers won't complete.
Steven Schäferb7520772021-05-25 17:48:51 +0200151 withTimeout(10.seconds) {
Yigit Boyar499b6452023-08-21 12:14:05 -0700152 (writers + readers).awaitAll()
rohitsat132baf1a02021-01-28 13:05:03 -0800153 }
rohitsat1303fed862021-01-13 10:21:22 -0800154 }
rohitsat1303fed862021-01-13 10:21:22 -0800155
Yigit Boyar499b6452023-08-21 12:14:05 -0700156 @Test
157 fun testManyConcurrentReadsAndWrites_withBeginningReadFailures() = runBlocking<Unit> {
rohitsat1303fed862021-01-13 10:21:22 -0800158 val file = tempFolder.newFile()
159 file.delete()
160
161 val serializer = LongSerializer(failWrites = false, failReads = true)
162
163 val dataStore = DataStoreFactory.create(
164 serializer = serializer,
Yigit Boyar499b6452023-08-21 12:14:05 -0700165 scope = testScope
rohitsat1303fed862021-01-13 10:21:22 -0800166 ) { file }
167
Yigit Boyar499b6452023-08-21 12:14:05 -0700168 val readers = (0 until READER_COUNT).map {
169 testScope.async {
170 var retry = true
171 while (retry) {
172 // we retry because the test itself is creating read failures on purpose.
173 retry = false
174 try {
175 dataStore.data
176 .takeWhile {
177 it < FINAL_TEST_VALUE
178 }.assertIncreasingAfterFirstRead()
179 } catch (_: IOException) {
180 // reader is configured to throw IO exceptions in the test, hence it is
181 // ok to get them here. It means we need to go back to reading until we
182 // reach to the final value.
183 retry = true
rohitsat1303fed862021-01-13 10:21:22 -0800184 }
Yigit Boyar499b6452023-08-21 12:14:05 -0700185 }
rohitsat1303fed862021-01-13 10:21:22 -0800186 }
187 }
Yigit Boyar499b6452023-08-21 12:14:05 -0700188 val writers = (0 until WRITER_COUNT).map {
189 testScope.async {
190 repeat(UPDATES_PER_WRITER) {
rohitsat1303fed862021-01-13 10:21:22 -0800191 var success = false
192 while (!success) {
193 try {
194 dataStore.updateData { it.inc() }
195 success = true
Yigit Boyar499b6452023-08-21 12:14:05 -0700196 } catch (_: IOException) {
rohitsat1303fed862021-01-13 10:21:22 -0800197 }
198 }
199 }
200 }
201 }
202
203 // Read failures for first 100 ms
204 delay(100)
205 serializer.failReads = false
206
rohitsat132baf1a02021-01-28 13:05:03 -0800207 // There's no reason this should take more than a few seconds once writers complete and
208 // there's no reason writers won't complete.
Steven Schäferb7520772021-05-25 17:48:51 +0200209 withTimeout(10.seconds) {
Yigit Boyar499b6452023-08-21 12:14:05 -0700210 (writers + readers).awaitAll()
211 }
212 }
213
214 private suspend fun Flow<Long>.assertIncreasingAfterFirstRead() {
215 // at a very rare race condition, we might read the new value with old
216 // version during initialization due to reads without locks. So here,
217 // we assert that it is increasing except for the first 2 reads which can be the same value
218 var prev: Long = -1
219 this.collectIndexed { index, value ->
220 if (index <= 1) {
221 assertThat(value).isAtLeast(prev)
222 } else {
223 assertThat(value).isGreaterThan(prev)
224 }
225 prev = value
rohitsat132baf1a02021-01-28 13:05:03 -0800226 }
rohitsat1303fed862021-01-13 10:21:22 -0800227 }
228
229 private class LongSerializer(
230 @Volatile var failWrites: Boolean,
231 @Volatile var failReads: Boolean
232 ) :
233 Serializer<Long> {
234 override val defaultValue = 0L
235
rohitsat134bfd8d72021-02-18 11:37:08 -0800236 override suspend fun readFrom(input: InputStream): Long {
rohitsat1303fed862021-01-13 10:21:22 -0800237 if (failReads) {
238 throw IOException("failing read")
239 }
240 return DataInputStream(input).readLong()
241 }
242
rohitsat134bfd8d72021-02-18 11:37:08 -0800243 override suspend fun writeTo(t: Long, output: OutputStream) {
rohitsat1303fed862021-01-13 10:21:22 -0800244 if (failWrites) {
245 throw IOException("failing write")
246 }
247 DataOutputStream(output).writeLong(t)
248 }
249 }
Yigit Boyar499b6452023-08-21 12:14:05 -0700250
251 companion object {
252 private const val READER_COUNT = 100
253 private const val WRITER_COUNT = 25
254 private const val FINAL_TEST_VALUE = 100
255 private const val UPDATES_PER_WRITER = FINAL_TEST_VALUE / WRITER_COUNT
256
257 init {
258 check(UPDATES_PER_WRITER * WRITER_COUNT == FINAL_TEST_VALUE) {
259 "inconsistent test setup"
260 }
261 }
262 }
Aurimas Liutikas4d534002023-07-06 15:51:33 -0700263}