Convert HandlerAdapterExecutor to ScheduledExecutorService

 ScheduledExecutorService allows tasks to be posted in the future, but
 still provides all of the same functionality as Executor.

 ScheduledExecutorService also implements the Executor interface, so it
 can be used as a drop-in for existing code. To accommodate this, the
 methods which returned handler-based Executors on CameraXExecutor now
 return ScheduledExecutorService instead.

Bug: 121160431
Test: ./gradlew camera:camera-core:connectedCheck
Change-Id: I2eb45522adcd40d22c2ece58f53b63be9c0ad27b
diff --git a/camera/core/src/androidTest/java/androidx/camera/core/impl/utils/executors/HandlerAdapterExecutorTest.java b/camera/core/src/androidTest/java/androidx/camera/core/impl/utils/executors/HandlerAdapterExecutorTest.java
deleted file mode 100644
index c9d080b..0000000
--- a/camera/core/src/androidTest/java/androidx/camera/core/impl/utils/executors/HandlerAdapterExecutorTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright 2019 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package androidx.camera.core.impl.utils.executors;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import android.os.Handler;
-import android.os.HandlerThread;
-import android.os.Looper;
-
-import androidx.camera.core.impl.utils.executor.CameraXExecutors;
-import androidx.test.annotation.UiThreadTest;
-import androidx.test.ext.junit.runners.AndroidJUnit4;
-import androidx.test.filters.SmallTest;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-@SmallTest
-@RunWith(AndroidJUnit4.class)
-public class HandlerAdapterExecutorTest {
-
-    @Test
-    public void canExecuteOnCurrentThreadExecutor() throws InterruptedException {
-        final AtomicBoolean executed = new AtomicBoolean(false);
-        Thread thread = new Thread("canExecuteOnCurrentThreadExecutor_thread") {
-            @Override
-            public void run() {
-                Looper.prepare();
-
-                Executor currentExecutor = CameraXExecutors.myLooperExecutor();
-
-                currentExecutor.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        executed.set(true);
-                        Looper.myLooper().quitSafely();
-                    }
-                });
-
-                Looper.loop();
-            }
-        };
-
-        thread.start();
-        thread.join();
-
-        assertThat(executed.get()).isTrue();
-    }
-
-    @Test
-    public void retrieveCurrentThreadExecutor_throwsOnNonLooperThread()
-            throws InterruptedException {
-        final AtomicReference<Throwable> thrownException = new AtomicReference<>(null);
-
-        Thread thread = new Thread("retrieveCurrentThreadExecutor_throwsOnNonLooperThread_thread") {
-            @Override
-            public void run() {
-                try {
-                    CameraXExecutors.myLooperExecutor();
-                } catch (Throwable throwable) {
-                    thrownException.set(throwable);
-                }
-
-            }
-        };
-
-        thread.start();
-        thread.join();
-
-        assertThat(thrownException.get()).isNotNull();
-        assertThat(thrownException.get()).isInstanceOf(IllegalStateException.class);
-    }
-
-    @Test
-    @UiThreadTest
-    public void canRetrieveMainThreadExecutor_withCurrentThreadExecutor() {
-        // Current thread is main thread since this test is annotated @UiThreadTest
-        Executor currentThreadExecutor = CameraXExecutors.myLooperExecutor();
-        Executor mainThreadExecutor = CameraXExecutors.mainThreadExecutor();
-
-        assertThat(currentThreadExecutor).isSameInstanceAs(mainThreadExecutor);
-    }
-
-    @Test
-    public void canWrapHandlerAndExecute() throws InterruptedException {
-        final HandlerThread handlerThread = new HandlerThread("canWrapHandlerAndExecute_thread");
-        handlerThread.start();
-
-        Handler handler = new Handler(handlerThread.getLooper());
-
-        Executor executor = CameraXExecutors.newHandlerExecutor(handler);
-        final Semaphore semaphore = new Semaphore(0);
-
-        executor.execute(new Runnable() {
-            @Override
-            public void run() {
-                // Clean up the handlerThread while we're here.
-                handlerThread.quitSafely();
-                semaphore.release();
-            }
-        });
-
-        // Wait for the thread to execute
-        semaphore.acquire();
-
-        // No need to assert. If we don't time out, the test passed.
-    }
-}
diff --git a/camera/core/src/androidTest/java/androidx/camera/core/impl/utils/executors/HandlerScheduledExecutorServiceTest.java b/camera/core/src/androidTest/java/androidx/camera/core/impl/utils/executors/HandlerScheduledExecutorServiceTest.java
new file mode 100644
index 0000000..18beebd
--- /dev/null
+++ b/camera/core/src/androidTest/java/androidx/camera/core/impl/utils/executors/HandlerScheduledExecutorServiceTest.java
@@ -0,0 +1,262 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.core.impl.utils.executors;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import static org.mockito.Mockito.mock;
+
+import android.os.Handler;
+import android.os.HandlerThread;
+import android.os.Looper;
+import android.os.SystemClock;
+
+import androidx.camera.core.impl.utils.executor.CameraXExecutors;
+import androidx.test.annotation.UiThreadTest;
+import androidx.test.ext.junit.runners.AndroidJUnit4;
+import androidx.test.filters.MediumTest;
+import androidx.test.filters.SmallTest;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+@SmallTest
+@RunWith(AndroidJUnit4.class)
+public class HandlerScheduledExecutorServiceTest {
+
+    private static final long DELAYED_TASK_DELAY_MILLIS = 250;
+    private static final int MAGIC_VALUE = 42;
+
+    @Test
+    public void canExecuteOnCurrentThreadExecutor() throws InterruptedException {
+        final AtomicBoolean executed = new AtomicBoolean(false);
+        Thread thread = new Thread("canExecuteOnCurrentThreadExecutor_thread") {
+            @Override
+            public void run() {
+                Looper.prepare();
+
+                Executor currentExecutor = CameraXExecutors.myLooperExecutor();
+
+                currentExecutor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        executed.set(true);
+                        Looper.myLooper().quitSafely();
+                    }
+                });
+
+                Looper.loop();
+            }
+        };
+
+        thread.start();
+        thread.join();
+
+        assertThat(executed.get()).isTrue();
+    }
+
+    @Test
+    public void retrieveCurrentThreadExecutor_throwsOnNonLooperThread()
+            throws InterruptedException {
+        final AtomicReference<Throwable> thrownException = new AtomicReference<>(null);
+
+        Thread thread = new Thread("retrieveCurrentThreadExecutor_throwsOnNonLooperThread_thread") {
+            @Override
+            public void run() {
+                try {
+                    CameraXExecutors.myLooperExecutor();
+                } catch (Throwable throwable) {
+                    thrownException.set(throwable);
+                }
+
+            }
+        };
+
+        thread.start();
+        thread.join();
+
+        assertThat(thrownException.get()).isNotNull();
+        assertThat(thrownException.get()).isInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    @UiThreadTest
+    public void canRetrieveMainThreadExecutor_withCurrentThreadExecutor() {
+        // Current thread is main thread since this test is annotated @UiThreadTest
+        Executor currentThreadExecutor = CameraXExecutors.myLooperExecutor();
+        Executor mainThreadExecutor = CameraXExecutors.mainThreadExecutor();
+
+        assertThat(currentThreadExecutor).isSameInstanceAs(mainThreadExecutor);
+    }
+
+    @Test
+    public void canWrapHandlerAndExecute() throws InterruptedException {
+        final HandlerThread handlerThread = new HandlerThread("canWrapHandlerAndExecute_thread");
+        handlerThread.start();
+
+        Handler handler = new Handler(handlerThread.getLooper());
+
+        Executor executor = CameraXExecutors.newHandlerExecutor(handler);
+        final Semaphore semaphore = new Semaphore(0);
+
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                // Clean up the handlerThread while we're here.
+                handlerThread.quitSafely();
+                semaphore.release();
+            }
+        });
+
+        // Wait for the thread to execute
+        semaphore.acquire();
+
+        // No need to assert. If we don't time out, the test passed.
+    }
+
+    @Test
+    @MediumTest
+    public void canExecuteTaskInFuture() throws InterruptedException {
+        final ScheduledExecutorService executor = CameraXExecutors.mainThreadExecutor();
+
+        final AtomicLong startTimeMillis = new AtomicLong(Long.MAX_VALUE);
+        final AtomicLong executionTimeMillis = new AtomicLong(0);
+        final Semaphore semaphore = new Semaphore(0);
+        Runnable postDelayedTaskRunnable = new Runnable() {
+            @Override
+            public void run() {
+                startTimeMillis.set(SystemClock.uptimeMillis());
+                executor.schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        // Mark execution time
+                        executionTimeMillis.set(SystemClock.uptimeMillis() - startTimeMillis.get());
+                        semaphore.release();
+                    }
+                }, DELAYED_TASK_DELAY_MILLIS, TimeUnit.MILLISECONDS);
+            }
+        };
+
+        // Start the runnable which will set the start time and post the delayed runnable
+        executor.execute(postDelayedTaskRunnable);
+
+        // Wait for the task to complete
+        semaphore.acquire();
+
+        assertThat(executionTimeMillis.get()).isAtLeast(DELAYED_TASK_DELAY_MILLIS);
+    }
+
+    @Test
+    @MediumTest
+    public void canCancelScheduledTask() throws InterruptedException {
+        final ScheduledExecutorService executor = CameraXExecutors.mainThreadExecutor();
+
+        final AtomicBoolean cancelledTaskRan = new AtomicBoolean(false);
+        final Semaphore semaphore = new Semaphore(0);
+        Runnable postMultipleDelayedRunnable = new Runnable() {
+            @Override
+            public void run() {
+                ScheduledFuture<?> futureToCancel = executor.schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        // This should not occur
+                        cancelledTaskRan.set(true);
+                    }
+                }, DELAYED_TASK_DELAY_MILLIS, TimeUnit.MILLISECONDS);
+
+                // Schedule after the time where the above runnable would have ran if not cancelled.
+                executor.schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        // Allow test to finish
+                        semaphore.release();
+                    }
+                }, DELAYED_TASK_DELAY_MILLIS + 1, TimeUnit.MILLISECONDS);
+
+                // Cancel the first runnable. It should be impossible for it to have run since we
+                // are currently running on the execution thread.
+                futureToCancel.cancel(true);
+            }
+        };
+
+        // Post both to the thread that the runnables will be scheduled on to eliminate the
+        // chance of a race in cancelling the task.
+        executor.execute(postMultipleDelayedRunnable);
+
+        semaphore.acquire();
+
+        assertThat(cancelledTaskRan.get()).isFalse();
+    }
+
+    @Test
+    @MediumTest
+    public void canRetrieveValueFromFuture() throws ExecutionException, InterruptedException {
+        final ScheduledExecutorService executor = CameraXExecutors.mainThreadExecutor();
+        ScheduledFuture<Integer> future = executor.schedule(new Callable<Integer>() {
+            @Override
+            public Integer call() {
+                return MAGIC_VALUE;
+            }
+        }, DELAYED_TASK_DELAY_MILLIS, TimeUnit.MILLISECONDS);
+
+        assertThat(future.get()).isEqualTo(MAGIC_VALUE);
+    }
+
+    @Test(expected = ExecutionException.class)
+    @MediumTest
+    public void schedulingOnShutdownLooperReturnsException()
+            throws InterruptedException, ExecutionException {
+        final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<>(null);
+        Thread thread = new Thread("canExecuteOnCurrentThreadExecutor_thread") {
+            @Override
+            public void run() {
+                Looper.prepare();
+
+                final ScheduledExecutorService currentExecutor =
+                        CameraXExecutors.myLooperExecutor();
+
+                currentExecutor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        executor.set(currentExecutor);
+                        Looper.myLooper().quitSafely();
+                    }
+                });
+
+                Looper.loop();
+            }
+        };
+
+        thread.start();
+        thread.join();
+
+        ScheduledFuture<?> future = executor.get().schedule(mock(Runnable.class),
+                DELAYED_TASK_DELAY_MILLIS, TimeUnit.MILLISECONDS);
+        future.get();
+    }
+}
diff --git a/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/CameraXExecutors.java b/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/CameraXExecutors.java
index c973c5b..1c71a01 100644
--- a/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/CameraXExecutors.java
+++ b/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/CameraXExecutors.java
@@ -22,6 +22,7 @@
 import androidx.annotation.NonNull;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * Utility class for generating specific implementations of {@link Executor}.
@@ -32,8 +33,8 @@
     private CameraXExecutors() {
     }
 
-    /** Returns a cached {@link Executor} which posts to the main thread. */
-    public static Executor mainThreadExecutor() {
+    /** Returns a cached {@link ScheduledExecutorService} which posts to the main thread. */
+    public static ScheduledExecutorService mainThreadExecutor() {
         return MainThreadExecutor.getInstance();
     }
 
@@ -66,8 +67,8 @@
      * @return An executor which posts to the thread's current looper.
      * @throws IllegalStateException if the current thread does not have a looper.
      */
-    public static Executor myLooperExecutor() {
-        return HandlerAdapterExecutor.currentThreadExecutor();
+    public static ScheduledExecutorService myLooperExecutor() {
+        return HandlerScheduledExecutorService.currentThreadExecutor();
     }
 
     /**
@@ -75,7 +76,7 @@
      *
      * @return An executor which posts to the given handler.
      */
-    public static Executor newHandlerExecutor(@NonNull Handler handler) {
-        return new HandlerAdapterExecutor(handler);
+    public static ScheduledExecutorService newHandlerExecutor(@NonNull Handler handler) {
+        return new HandlerScheduledExecutorService(handler);
     }
 }
diff --git a/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/HandlerAdapterExecutor.java b/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/HandlerAdapterExecutor.java
deleted file mode 100644
index 0c00c49..0000000
--- a/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/HandlerAdapterExecutor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright 2019 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package androidx.camera.core.impl.utils.executor;
-
-import android.os.Handler;
-import android.os.Looper;
-
-import androidx.annotation.NonNull;
-import androidx.core.util.Preconditions;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
-
-/**
- * An implementation of {@link Executor} which delegates all tasks to the given {@link Handler}.
- */
-final class HandlerAdapterExecutor implements Executor {
-
-    private static ThreadLocal<Executor> sHandlerThreadLocal =
-            new ThreadLocal<Executor>() {
-                @Override
-                public Executor initialValue() {
-                    if (Looper.myLooper() == Looper.getMainLooper()) {
-                        return CameraXExecutors.mainThreadExecutor();
-                    } else if (Looper.myLooper() != null) {
-                        Handler handler = new Handler(Looper.myLooper());
-                        return new HandlerAdapterExecutor(handler);
-                    }
-
-                    return null;
-                }
-            };
-    private final Handler mHandler;
-
-    HandlerAdapterExecutor(@NonNull Handler handler) {
-        mHandler = Preconditions.checkNotNull(handler);
-    }
-
-    /**
-     * Retrieves a cached executor derived from the current thread's looper.
-     */
-    static Executor currentThreadExecutor() {
-        Executor executor = sHandlerThreadLocal.get();
-        if (executor == null) {
-            Looper looper = Looper.myLooper();
-            if (looper == null) {
-                throw new IllegalStateException("Current thread has no looper!");
-            }
-
-            executor = new HandlerAdapterExecutor(new Handler(looper));
-            sHandlerThreadLocal.set(executor);
-        }
-
-        return executor;
-    }
-
-    @Override
-    public void execute(Runnable command) {
-        if (!mHandler.post(command)) {
-            throw new RejectedExecutionException(mHandler + " is shutting down");
-        }
-    }
-}
diff --git a/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/HandlerScheduledExecutorService.java b/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/HandlerScheduledExecutorService.java
new file mode 100644
index 0000000..8644b6a
--- /dev/null
+++ b/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/HandlerScheduledExecutorService.java
@@ -0,0 +1,276 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.core.impl.utils.executor;
+
+import android.os.Handler;
+import android.os.Looper;
+import android.os.SystemClock;
+
+import androidx.annotation.NonNull;
+import androidx.camera.core.impl.utils.futures.Futures;
+import androidx.concurrent.futures.CallbackToFutureAdapter;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An implementation of {@link ScheduledExecutorService} which delegates all scheduled task to
+ * the given {@link Handler}.
+ *
+ * <p>Currently, can only be used to schedule future non-repeating tasks.
+ */
+final class HandlerScheduledExecutorService extends AbstractExecutorService implements
+        ScheduledExecutorService {
+
+    private static ThreadLocal<ScheduledExecutorService> sThreadLocalInstance =
+            new ThreadLocal<ScheduledExecutorService>() {
+                @Override
+                public ScheduledExecutorService initialValue() {
+                    if (Looper.myLooper() == Looper.getMainLooper()) {
+                        return CameraXExecutors.mainThreadExecutor();
+                    } else if (Looper.myLooper() != null) {
+                        Handler handler = new Handler(Looper.myLooper());
+                        return new HandlerScheduledExecutorService(handler);
+                    }
+
+                    return null;
+                }
+            };
+
+    private final Handler mHandler;
+
+    HandlerScheduledExecutorService(@NonNull Handler handler) {
+        mHandler = handler;
+    }
+
+    /**
+     * Retrieves a cached executor derived from the current thread's looper.
+     */
+    static ScheduledExecutorService currentThreadExecutor() {
+        ScheduledExecutorService executor = sThreadLocalInstance.get();
+        if (executor == null) {
+            Looper looper = Looper.myLooper();
+            if (looper == null) {
+                throw new IllegalStateException("Current thread has no looper!");
+            }
+
+            executor = new HandlerScheduledExecutorService(new Handler(looper));
+            sThreadLocalInstance.set(executor);
+        }
+
+        return executor;
+    }
+
+    @Override
+    public ScheduledFuture<?> schedule(
+            @NonNull final Runnable command,
+            long delay,
+            @NonNull TimeUnit unit) {
+        Callable<Void> wrapper = new Callable<Void>() {
+            @Override
+            public Void call() {
+                command.run();
+                return null;
+            }
+        };
+        return schedule(wrapper, delay, unit);
+    }
+
+    @Override
+    @NonNull
+    public <V> ScheduledFuture<V> schedule(
+            @NonNull Callable<V> callable,
+            long delay,
+            @NonNull TimeUnit unit) {
+        long runAtMillis = SystemClock.uptimeMillis() + TimeUnit.MILLISECONDS.convert(delay, unit);
+        HandlerScheduledFuture<V> future = new HandlerScheduledFuture<>(mHandler, runAtMillis,
+                callable);
+        if (mHandler.postAtTime(future, runAtMillis)) {
+            return future;
+        }
+
+        return Futures.immediateFailedScheduledFuture(createPostFailedException());
+    }
+
+    @Override
+    @NonNull
+    public ScheduledFuture<?> scheduleAtFixedRate(
+            @NonNull Runnable command,
+            long initialDelay,
+            long period,
+            @NonNull TimeUnit unit) {
+        throw new UnsupportedOperationException(
+                HandlerScheduledExecutorService.class.getSimpleName()
+                        + " does not yet support fixed-rate scheduling.");
+    }
+
+    @Override
+    @NonNull
+    public ScheduledFuture<?> scheduleWithFixedDelay(@NonNull Runnable command, long initialDelay,
+            long delay, @NonNull TimeUnit unit) {
+        throw new UnsupportedOperationException(
+                HandlerScheduledExecutorService.class.getSimpleName()
+                        + " does not yet support fixed-delay scheduling.");
+    }
+
+    @Override
+    public void shutdown() {
+        throw new UnsupportedOperationException(
+                HandlerScheduledExecutorService.class.getSimpleName()
+                        + " cannot be shut down. Use Looper.quitSafely().");
+    }
+
+    @Override
+    @NonNull
+    public List<Runnable> shutdownNow() {
+        throw new UnsupportedOperationException(
+                HandlerScheduledExecutorService.class.getSimpleName()
+                        + " cannot be shut down. Use Looper.quitSafely().");
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return false;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return false;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, @NonNull TimeUnit unit) {
+        throw new UnsupportedOperationException(
+                HandlerScheduledExecutorService.class.getSimpleName()
+                        + " cannot be shut down. Use Looper.quitSafely().");
+    }
+
+    @Override
+    public void execute(@NonNull Runnable command) {
+        if (!mHandler.post(command)) {
+            throw createPostFailedException();
+        }
+    }
+
+    private RejectedExecutionException createPostFailedException() {
+        return new RejectedExecutionException(mHandler + " is shutting down");
+    }
+
+    private static class HandlerScheduledFuture<V> implements RunnableScheduledFuture<V> {
+
+        final AtomicReference<CallbackToFutureAdapter.Completer<V>>
+                mCompleter = new AtomicReference<>(null);
+        private final long mRunAtMillis;
+        private final Callable<V> mTask;
+        private final ListenableFuture<V> mDelegate;
+
+        HandlerScheduledFuture(final Handler handler, long runAtMillis, final Callable<V> task) {
+            mRunAtMillis = runAtMillis;
+            mTask = task;
+            mDelegate = CallbackToFutureAdapter.getFuture(
+                    new CallbackToFutureAdapter.Resolver<V>() {
+
+                        @Override
+                        public Object attachCompleter(
+                                @NonNull CallbackToFutureAdapter.Completer<V> completer) throws
+                                RejectedExecutionException {
+
+                            completer.addCancellationListener(new Runnable() {
+                                @Override
+                                public void run() {
+                                    // Remove the completer if we're cancelled so the task won't
+                                    // run.
+                                    if (mCompleter.getAndSet(null) != null) {
+                                        handler.removeCallbacks(HandlerScheduledFuture.this);
+                                    }
+                                }
+                            }, CameraXExecutors.directExecutor());
+
+                            mCompleter.set(completer);
+                            return "HandlerScheduledFuture-" + task.toString();
+                        }
+                    });
+        }
+
+        @Override
+        public boolean isPeriodic() {
+            return false;
+        }
+
+        @Override
+        public long getDelay(TimeUnit unit) {
+            return unit.convert(mRunAtMillis - System.currentTimeMillis(),
+                    TimeUnit.MILLISECONDS);
+        }
+
+        @Override
+        public int compareTo(Delayed o) {
+            return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
+        }
+
+        @Override
+        public void run() {
+            // If completer is null, it has already run or is cancelled.
+            CallbackToFutureAdapter.Completer<V> completer = mCompleter.getAndSet(null);
+            if (completer != null) {
+                try {
+                    completer.set(mTask.call());
+                } catch (Exception e) {
+                    completer.setException(e);
+                }
+            }
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return mDelegate.cancel(mayInterruptIfRunning);
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return mDelegate.isCancelled();
+        }
+
+        @Override
+        public boolean isDone() {
+            return mDelegate.isDone();
+        }
+
+        @Override
+        public V get() throws ExecutionException, InterruptedException {
+            return mDelegate.get();
+        }
+
+        @Override
+        public V get(long timeout, @NonNull TimeUnit unit)
+                throws ExecutionException, InterruptedException, TimeoutException {
+            return mDelegate.get(timeout, unit);
+        }
+    }
+}
diff --git a/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/MainThreadExecutor.java b/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/MainThreadExecutor.java
index 225a3f3..977c9d9 100644
--- a/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/MainThreadExecutor.java
+++ b/camera/core/src/main/java/androidx/camera/core/impl/utils/executor/MainThreadExecutor.java
@@ -20,25 +20,32 @@
 import android.os.Looper;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
- * Helper class for retrieving an {@link Executor} which will post to the main thread.
+ * Helper class for retrieving an {@link ScheduledExecutorService} which will post to the main
+ * thread.
+ *
+ * <p>Since {@link ScheduledExecutorService} implements {@link Executor}, this can also be used
+ * as a simple Executor.
  */
 final class MainThreadExecutor {
-    private static volatile Executor sExecutor;
+    private static volatile ScheduledExecutorService sInstance;
 
-    private MainThreadExecutor() {}
+    private MainThreadExecutor() {
+    }
 
-    static Executor getInstance() {
-        if (sExecutor != null) {
-            return sExecutor;
+    static ScheduledExecutorService getInstance() {
+        if (sInstance != null) {
+            return sInstance;
         }
         synchronized (MainThreadExecutor.class) {
-            if (sExecutor == null) {
-                sExecutor = new HandlerAdapterExecutor(new Handler(Looper.getMainLooper()));
+            if (sInstance == null) {
+                sInstance = new HandlerScheduledExecutorService(
+                        new Handler(Looper.getMainLooper()));
             }
         }
 
-        return sExecutor;
+        return sInstance;
     }
 }
diff --git a/camera/core/src/main/java/androidx/camera/core/impl/utils/futures/Futures.java b/camera/core/src/main/java/androidx/camera/core/impl/utils/futures/Futures.java
index d941c69..2bab1e5 100644
--- a/camera/core/src/main/java/androidx/camera/core/impl/utils/futures/Futures.java
+++ b/camera/core/src/main/java/androidx/camera/core/impl/utils/futures/Futures.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
 
 /**
  * Utility class for generating specific implementations of {@link ListenableFuture}.
@@ -42,6 +43,7 @@
      * @param <V>   The type of the result.
      * @return A future which immediately contains the result.
      */
+    @NonNull
     public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
         if (value == null) {
             return ImmediateFuture.nullFuture();
@@ -59,11 +61,26 @@
      * @param <V>   The type of the result.
      * @return A future which immediately contains an exception.
      */
+    @NonNull
     public static <V> ListenableFuture<V> immediateFailedFuture(@NonNull Throwable cause) {
         return new ImmediateFuture.ImmediateFailedFuture<>(cause);
     }
 
     /**
+     * Returns an implementation of {@link ScheduledFuture} which immediately contains an
+     * exception that will be thrown by {@link Future#get()}.
+     *
+     * @param cause The cause of the {@link ExecutionException} that will be thrown by
+     * {@link Future#get()}.
+     * @param <V>   The type of the result.
+     * @return A future which immediately contains an exception.
+     */
+    @NonNull
+    public static <V> ScheduledFuture<V> immediateFailedScheduledFuture(@NonNull Throwable cause) {
+        return new ImmediateFuture.ImmediateFailedScheduledFuture<>(cause);
+    }
+
+    /**
      * Returns a new {@code Future} whose result is asynchronously derived from the result
      * of the given {@code Future}. If the given {@code Future} fails, the returned {@code Future}
      * fails with the same exception (and the function is not invoked).
@@ -75,10 +92,11 @@
      * @return A future that holds result of the function (if the input succeeded) or the original
      *     input's failure (if not)
      */
+    @NonNull
     public static <I, O> ListenableFuture<O> transformAsync(
-            ListenableFuture<I> input,
-            AsyncFunction<? super I, ? extends O> function,
-            Executor executor) {
+            @NonNull ListenableFuture<I> input,
+            @NonNull AsyncFunction<? super I, ? extends O> function,
+            @NonNull Executor executor) {
         return AbstractTransformFuture.create(input, function, executor);
     }
 
@@ -93,9 +111,10 @@
      * @param executor Executor to run the function in.
      * @return A future that holds result of the transformation.
      */
+    @NonNull
     public static <I, O> ListenableFuture<O> transform(
-            ListenableFuture<I> input, Function<? super I, ? extends O> function,
-            Executor executor) {
+            @NonNull ListenableFuture<I> input, @NonNull Function<? super I, ? extends O> function,
+            @NonNull Executor executor) {
         return AbstractTransformFuture.create(input, function, executor);
     }
 
@@ -111,8 +130,9 @@
      * @param futures futures to combine
      * @return a future that provides a list of the results of the component futures
      */
+    @NonNull
     public static <V> ListenableFuture<List<V>> successfulAsList(
-            Collection<? extends ListenableFuture<? extends V>> futures) {
+            @NonNull Collection<? extends ListenableFuture<? extends V>> futures) {
         return new CollectionFuture.ListFuture<V>(futures, false);
     }
 
@@ -128,8 +148,9 @@
      * @param futures futures to combine
      * @return a future that provides a list of the results of the component futures
      */
+    @NonNull
     public static <V> ListenableFuture<List<V>> allAsList(
-            Collection<? extends ListenableFuture<? extends V>> futures) {
+            @NonNull Collection<? extends ListenableFuture<? extends V>> futures) {
         return new CollectionFuture.ListFuture<V>(futures, true);
     }
 
@@ -143,9 +164,9 @@
      * @param executor The executor to run {@code callback} when the future completes.
      */
     public static <V> void addCallback(
-            final ListenableFuture<V> future,
-            final FutureCallback<? super V> callback,
-            Executor executor) {
+            @NonNull final ListenableFuture<V> future,
+            @NonNull final FutureCallback<? super V> callback,
+            @NonNull Executor executor) {
         Preconditions.checkNotNull(callback);
         future.addListener(new CallbackListener<V>(future, callback), executor);
     }
@@ -194,7 +215,8 @@
      * @throws CancellationException if the {@code Future} was cancelled
      * @throws IllegalStateException if the {@code Future} is not done
      */
-    public static <V> V getDone(Future<V> future) throws ExecutionException {
+    @Nullable
+    public static <V> V getDone(@NonNull Future<V> future) throws ExecutionException {
         /*
          * We throw IllegalStateException, since the call could succeed later. Perhaps we
          * "should" throw IllegalArgumentException, since the call could succeed with a different
@@ -211,12 +233,13 @@
     }
 
     /**
-     * Invokes {@code mFuture.}{@link Future#get() get()} uninterruptibly.
+     * Invokes {@code Future.}{@link Future#get() get()} uninterruptibly.
      *
      * @throws ExecutionException if the computation threw an exception
      * @throws CancellationException if the computation was cancelled
      */
-    public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
+    @Nullable
+    public static <V> V getUninterruptibly(@NonNull Future<V> future) throws ExecutionException {
         boolean interrupted = false;
         try {
             while (true) {
diff --git a/camera/core/src/main/java/androidx/camera/core/impl/utils/futures/ImmediateFuture.java b/camera/core/src/main/java/androidx/camera/core/impl/utils/futures/ImmediateFuture.java
index 3be6f73..1aa93cc 100644
--- a/camera/core/src/main/java/androidx/camera/core/impl/utils/futures/ImmediateFuture.java
+++ b/camera/core/src/main/java/androidx/camera/core/impl/utils/futures/ImmediateFuture.java
@@ -24,8 +24,10 @@
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -51,7 +53,7 @@
     }
 
     @Override
-    public void addListener(Runnable listener, Executor executor) {
+    public void addListener(@NonNull Runnable listener, @NonNull Executor executor) {
         Preconditions.checkNotNull(listener);
         Preconditions.checkNotNull(executor);
 
@@ -87,7 +89,7 @@
 
     @Override
     @Nullable
-    public V get(long timeout, TimeUnit unit) throws ExecutionException {
+    public V get(long timeout, @NonNull TimeUnit unit) throws ExecutionException {
         Preconditions.checkNotNull(unit);
         return get();
     }
@@ -118,9 +120,9 @@
         }
     }
 
-    static final class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
+    static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
 
-        @Nullable
+        @NonNull
         private final Throwable mCause;
 
         ImmediateFailedFuture(@NonNull Throwable cause) {
@@ -134,9 +136,28 @@
         }
 
         @Override
+        @NonNull
         public String toString() {
             // Behaviour analogous to AbstractResolvableFuture#toString().
             return super.toString() + "[status=FAILURE, cause=[" + mCause + "]]";
         }
     }
+
+    static final class ImmediateFailedScheduledFuture<V> extends ImmediateFailedFuture<V> implements
+            ScheduledFuture<V> {
+
+        ImmediateFailedScheduledFuture(@NonNull Throwable cause) {
+            super(cause);
+        }
+
+        @Override
+        public long getDelay(@NonNull TimeUnit timeUnit) {
+            return 0;
+        }
+
+        @Override
+        public int compareTo(@NonNull Delayed delayed) {
+            return -1;
+        }
+    }
 }