Skip to content

Commit

Permalink
Eagerly fetch GoogleCloudStorageReadChannel metadata if 'fs.gs.inputs…
Browse files Browse the repository at this point in the history
…tream.fast.fail.on.not.found.enable' is true
  • Loading branch information
medb committed Feb 25, 2019
1 parent f81d823 commit 8f6443b
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 5 deletions.
3 changes: 3 additions & 0 deletions gcs/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
1. Fix bug when GCS connector lists all files in directory instead of
specified limit.

2. Eagerly initialize `GoogleCloudStorageReadChannel` metadata if
`fs.gs.inputstream.fast.fail.on.not.found.enable` set to true.


1.9.15 - 2019-02-21

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.hadoop.gcsio;

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.createItemInfoForStorageObject;
import static com.google.cloud.hadoop.gcsio.StorageResourceId.createReadableString;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -36,6 +37,8 @@
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.GenerationReadConsistency;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ClientRequestHelper;
import com.google.cloud.hadoop.util.ResilientOperation;
import com.google.cloud.hadoop.util.RetryDeterminer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
Expand Down Expand Up @@ -289,8 +292,28 @@ ExponentialBackOff createBackOff() {
* read.
*/
@Nullable
protected GoogleCloudStorageItemInfo getInitialMetadata() {
return null;
protected GoogleCloudStorageItemInfo getInitialMetadata() throws IOException {
if (!readOptions.getFastFailOnNotFound()) {
return null;
}
Storage.Objects.Get getObject = gcs.objects().get(bucketName, objectName);
StorageObject object;
try {
object =
ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(getObject),
readBackOff.get(),
RetryDeterminer.SOCKET_ERRORS,
IOException.class,
sleeper);
} catch (IOException e) {
throw errorExtractor.itemNotFound(e)
? GoogleCloudStorageExceptions.getFileNotFoundException(bucketName, objectName)
: new IOException("Error reading " + resourceIdString, e);
} catch (InterruptedException e) { // From the sleep
throw new IOException("Thread interrupt received.", e);
}
return createItemInfoForStorageObject(new StorageResourceId(bucketName, objectName), object);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public synchronized SeekableByteChannel getReadChannel(GoogleCloudStorageReadOpt
return new InMemoryObjectReadChannel(completedContents, readOptions) {
@Nullable
@Override
protected GoogleCloudStorageItemInfo getInitialMetadata() {
protected GoogleCloudStorageItemInfo getInitialMetadata() throws IOException {
return readOptions.getFastFailOnNotFound() ? getInfo() : super.getInitialMetadata();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,28 @@

package com.google.cloud.hadoop.gcsio;

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageTestUtils.BUCKET_NAME;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageTestUtils.JSON_FACTORY;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageTestUtils.OBJECT_NAME;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageTestUtils.createReadChannel;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageTestUtils.dataRangeResponse;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageTestUtils.dataResponse;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageTestUtils.metadataResponse;
import static com.google.common.truth.Truth.assertThat;
import static java.util.stream.Collectors.toList;

import com.google.api.client.http.HttpRequest;
import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.util.DateTime;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -38,6 +45,49 @@
@RunWith(JUnit4.class)
public class GoogleCloudStorageReadChannelTest {

@Test
public void metadataInitialization_eager() throws IOException {
MockHttpTransport transport =
GoogleCloudStorageTestUtils.mockTransport(
metadataResponse(
new StorageObject()
.setBucket(BUCKET_NAME)
.setName(OBJECT_NAME)
.setSize(new BigInteger("123"))
.setGeneration(1L)
.setMetageneration(1L)
.setUpdated(new DateTime(new Date()))));

List<HttpRequest> requests = new ArrayList<>();

Storage storage = new Storage(transport, JSON_FACTORY, requests::add);

GoogleCloudStorageReadOptions options =
GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(true).build();

GoogleCloudStorageReadChannel readChannel = createReadChannel(storage, options);

assertThat(readChannel.size()).isEqualTo(123);
assertThat(requests).hasSize(1);
}

@Test
public void metadataInitialization_lazy() throws IOException {
MockHttpTransport transport = GoogleCloudStorageTestUtils.mockTransport();

List<HttpRequest> requests = new ArrayList<>();

Storage storage = new Storage(transport, JSON_FACTORY, requests::add);

GoogleCloudStorageReadOptions options =
GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(false).build();

GoogleCloudStorageReadChannel readChannel = createReadChannel(storage, options);

assertThat(readChannel.size()).isEqualTo(-1);
assertThat(requests).isEmpty();
}

@Test
public void fadviseAuto_onForwardRead_switchesToRandom() throws IOException {
int seekPosition = 5;
Expand All @@ -57,6 +107,7 @@ public void fadviseAuto_onForwardRead_switchesToRandom() throws IOException {

GoogleCloudStorageReadOptions options =
GoogleCloudStorageReadOptions.builder()
.setFastFailOnNotFound(false)
.setFadvise(Fadvise.AUTO)
.setMinRangeRequestSize(1)
.setInplaceSeekLimit(2)
Expand Down Expand Up @@ -102,6 +153,7 @@ public void fadviseAuto_onBackwardRead_switchesToRandom() throws IOException {

GoogleCloudStorageReadOptions options =
GoogleCloudStorageReadOptions.builder()
.setFastFailOnNotFound(false)
.setFadvise(Fadvise.AUTO)
.setMinRangeRequestSize(1)
.build();
Expand Down Expand Up @@ -148,6 +200,7 @@ public void footerPrefetch_reused() throws IOException {

GoogleCloudStorageReadOptions options =
GoogleCloudStorageReadOptions.builder()
.setFastFailOnNotFound(false)
.setFadvise(Fadvise.RANDOM)
.setMinRangeRequestSize(footeSize)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public final class GoogleCloudStorageTestUtils {

public static final JacksonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance();

private static final String BUCKET_NAME = "foo-bucket";
private static final String OBJECT_NAME = "bar-object";
static final String BUCKET_NAME = "foo-bucket";
static final String OBJECT_NAME = "bar-object";

private static final ApiErrorExtractor ERROR_EXTRACTOR = ApiErrorExtractor.INSTANCE;
private static final ClientRequestHelper<StorageObject> REQUEST_HELPER =
Expand Down

0 comments on commit 8f6443b

Please sign in to comment.