From e43bb74c3af67a8ac2d52a17aee91a47d735c027 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Thu, 14 Dec 2023 07:34:49 +0530 Subject: [PATCH] Add MSQ Durable Storage Connector for Google Cloud Storage and change current Google Cloud Storage client library (#15398) The PR addresses 2 things: Add MSQ durable storage connector for GCS Change GCS client library from the old Google API Client Library to the recommended Google Cloud Client Library. Ref: https://cloud.google.com/apis/docs/client-libraries-explained --- distribution/bin/check-licenses.py | 1 + docs/multi-stage-query/reference.md | 8 +- docs/operations/durable-storage.md | 2 +- extensions-core/google-extensions/pom.xml | 22 +- .../google/GoogleCloudStorageInputSource.java | 15 +- .../storage/google/GoogleByteSource.java | 4 +- .../google/GoogleDataSegmentPuller.java | 2 +- .../druid/storage/google/GoogleStorage.java | 208 ++++++++++++-- .../google/GoogleStorageDruidModule.java | 18 +- .../google/GoogleStorageObjectMetadata.java | 96 +++++++ .../google/GoogleStorageObjectPage.java | 51 ++++ .../druid/storage/google/GoogleTaskLogs.java | 2 +- .../GoogleTimestampVersionedDataFinder.java | 24 +- .../druid/storage/google/GoogleUtils.java | 21 +- .../storage/google/ObjectStorageIterator.java | 66 ++--- .../google/output/GoogleInputRange.java | 91 ++++++ .../google/output/GoogleOutputConfig.java | 147 ++++++++++ .../google/output/GoogleStorageConnector.java | 216 +++++++++++++++ .../output/GoogleStorageConnectorModule.java | 44 +++ .../GoogleStorageConnectorProvider.java | 64 +++++ ...rg.apache.druid.initialization.DruidModule | 3 +- .../GoogleCloudStorageInputSourceTest.java | 79 ++++-- .../storage/google/GoogleByteSourceTest.java | 4 +- .../google/GoogleDataSegmentKillerTest.java | 50 +--- .../google/GoogleDataSegmentPullerTest.java | 4 +- .../google/GoogleStorageDruidModuleTest.java | 43 +-- .../storage/google/GoogleStorageTest.java | 262 +++++++++++++++--- .../storage/google/GoogleTaskLogsTest.java | 106 ++----- .../druid/storage/google/GoogleTestUtils.java | 71 ++--- ...oogleTimestampVersionedDataFinderTest.java | 34 ++- .../google/ObjectStorageIteratorTest.java | 105 ++----- .../google/output/GoogleInputRangeTest.java | 34 +++ .../google/output/GoogleOutputConfigTest.java | 49 ++++ .../GoogleStorageConnectorProviderTest.java | 152 ++++++++++ .../output/GoogleStorageConnectorTest.java | 210 ++++++++++++++ integration-tests-ex/cases/pom.xml | 29 +- .../druid/testsEx/utils/GcsTestUtil.java | 7 +- licenses.yaml | 197 ++++++++++++- pom.xml | 2 +- 39 files changed, 1996 insertions(+), 547 deletions(-) create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectMetadata.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectPage.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java create mode 100644 extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleInputRangeTest.java create mode 100644 extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleOutputConfigTest.java create mode 100644 extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java create mode 100644 extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorTest.java diff --git a/distribution/bin/check-licenses.py b/distribution/bin/check-licenses.py index cf6e7e35b661..70afd9031362 100755 --- a/distribution/bin/check-licenses.py +++ b/distribution/bin/check-licenses.py @@ -258,6 +258,7 @@ def build_compatible_license_names(): compatible_licenses['The BSD 3-Clause License'] = 'BSD-3-Clause License' compatible_licenses['Revised BSD'] = 'BSD-3-Clause License' compatible_licenses['New BSD License'] = 'BSD-3-Clause License' + compatible_licenses['BSD New license'] = 'BSD-3-Clause License' compatible_licenses['3-Clause BSD License'] = 'BSD-3-Clause License' compatible_licenses['BSD 3-Clause'] = 'BSD-3-Clause License' compatible_licenses['BSD-3-Clause'] = 'BSD-3-Clause License' diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index d71c58abbd1b..d74ef85f50e4 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -356,7 +356,7 @@ SQL-based ingestion supports using durable storage to store intermediate files t ### Durable storage configurations -Durable storage is supported on Amazon S3 storage and Microsoft's Azure Blob Storage. +Durable storage is supported on Amazon S3 storage, Microsoft's Azure Blob Storage and Google Cloud Storage. There are common configurations that control the behavior regardless of which storage service you use. Apart from these common configurations, there are a few properties specific to S3 and to Azure. Common properties to configure the behavior of durable storage @@ -364,16 +364,16 @@ Common properties to configure the behavior of durable storage |Parameter | Required | Description | Default | |--|--|--| |`druid.msq.intermediate.storage.enable` | Yes | Whether to enable durable storage for the cluster. Set it to true to enable durable storage. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md). | false | -|`druid.msq.intermediate.storage.type` | Yes | The type of storage to use. Set it to `s3` for S3 and `azure` for Azure | n/a | +|`druid.msq.intermediate.storage.type` | Yes | The type of storage to use. Set it to `s3` for S3, `azure` for Azure and `google` for Google | n/a | |`druid.msq.intermediate.storage.tempDir`| Yes | Directory path on the local disk to store temporary files required while uploading and downloading the data | n/a | |`druid.msq.intermediate.storage.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 | |`druid.msq.intermediate.storage.chunkSize` | No | Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls made to the durable storage, however it requires more disk space to store the temporary chunks. Druid uses a default of 100MiB if the value is not provided.| 100MiB | -To use S3 for durable storage, you also need to configure the following properties: +To use S3 or Google for durable storage, you also need to configure the following properties: |Parameter | Required | Description | Default | |-------------------|----------------------------------------|----------------------| --| -|`druid.msq.intermediate.storage.bucket` | Yes | The S3 bucket where the files are uploaded to and download from | n/a | +|`druid.msq.intermediate.storage.bucket` | Yes | The S3 or Google bucket where the files are uploaded to and download from | n/a | |`druid.msq.intermediate.storage.prefix` | Yes | Path prepended to all the paths uploaded to the bucket to namespace the connector's files. Provide a unique value for the prefix and do not share the same prefix between different clusters. If the location includes other files or directories, then they might get cleaned up as well. | n/a | To use Azure for durable storage, you also need to configure the following properties: diff --git a/docs/operations/durable-storage.md b/docs/operations/durable-storage.md index b7a8ad1ef905..78c5fe3765a6 100644 --- a/docs/operations/durable-storage.md +++ b/docs/operations/durable-storage.md @@ -25,7 +25,7 @@ sidebar_label: "Durable storage" You can use durable storage to improve querying from deep storage and SQL-based ingestion. -> Note that only S3 is supported as a durable storage location. +> Note that S3, Azure and Google are all supported as durable storage locations. Durable storage for queries from deep storage provides a location where you can write the results of deep storage queries to. Durable storage for SQL-based ingestion is used to temporarily house intermediate files, which can improve reliability. diff --git a/extensions-core/google-extensions/pom.xml b/extensions-core/google-extensions/pom.xml index cb3c72d1b954..b13428242ea3 100644 --- a/extensions-core/google-extensions/pom.xml +++ b/extensions-core/google-extensions/pom.xml @@ -48,15 +48,9 @@ - com.google.apis - google-api-services-storage - ${com.google.apis.storage.version} - - - com.google.api-client - google-api-client - - + com.google.cloud + google-cloud-storage + ${com.google.cloud.storage.version} commons-io @@ -125,6 +119,16 @@ 2.0.1 provided + + com.google.api + gax + 2.37.0 + + + com.google.cloud + google-cloud-core + 2.27.0 + org.apache.druid diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 5b9cea5d9c60..b99e8a36df94 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.Iterators; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputSplit; @@ -37,12 +36,12 @@ import org.apache.druid.storage.google.GoogleInputDataConfig; import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleStorageDruidModule; +import org.apache.druid.storage.google.GoogleStorageObjectMetadata; import org.apache.druid.storage.google.GoogleUtils; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; -import java.math.BigInteger; import java.net.URI; import java.util.Collections; import java.util.Iterator; @@ -139,7 +138,7 @@ public Iterator getDescriptorIteratorForPrefixes(List pre @Override public long getObjectSize(CloudObjectLocation location) throws IOException { - final StorageObject storageObject = storage.getMetadata(location.getBucket(), location.getPath()); + final GoogleStorageObjectMetadata storageObject = storage.getMetadata(location.getBucket(), location.getPath()); return getSize(storageObject); } } @@ -147,15 +146,15 @@ public long getObjectSize(CloudObjectLocation location) throws IOException return new SplitWidget(); } - private static long getSize(final StorageObject object) + private static long getSize(final GoogleStorageObjectMetadata object) { - final BigInteger sizeInBigInteger = object.getSize(); + final Long sizeInLong = object.getSize(); - if (sizeInBigInteger == null) { + if (sizeInLong == null) { return Long.MAX_VALUE; } else { try { - return sizeInBigInteger.longValueExact(); + return sizeInLong; } catch (ArithmeticException e) { LOG.warn( @@ -164,7 +163,7 @@ private static long getSize(final StorageObject object) + "The max long value will be used for its size instead.", object.getBucket(), object.getName(), - sizeInBigInteger + sizeInLong ); return Long.MAX_VALUE; } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java index 977353f9f204..03554fc5c639 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java @@ -51,12 +51,12 @@ public String getPath() @Override public InputStream openStream() throws IOException { - return storage.get(bucket, path); + return storage.getInputStream(bucket, path); } public InputStream openStream(long start) throws IOException { - return storage.get(bucket, path, start); + return storage.getInputStream(bucket, path, start); } @Override diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java index fc3f7d371f44..61595264fc67 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java @@ -83,7 +83,7 @@ FileUtils.FileCopyResult getSegmentFiles(final String bucket, final String path, public InputStream getInputStream(URI uri) throws IOException { String path = StringUtils.maybeRemoveLeadingSlash(uri.getPath()); - return storage.get(uri.getHost() != null ? uri.getHost() : uri.getAuthority(), path); + return storage.getInputStream(uri.getHost() != null ? uri.getHost() : uri.getAuthority(), path); } @Override diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java index f181d08f443c..91d290b17856 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java @@ -20,13 +20,26 @@ package org.apache.druid.storage.google; import com.google.api.client.http.AbstractInputStreamContent; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.Storage.Objects.Get; -import com.google.api.services.storage.model.StorageObject; +import com.google.api.gax.paging.Page; +import com.google.cloud.ReadChannel; +import com.google.cloud.WriteChannel; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; import com.google.common.base.Supplier; +import com.google.common.collect.Iterables; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.IOE; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; public class GoogleStorage { @@ -36,69 +49,204 @@ public class GoogleStorage * if we have a Storage instead of a supplier of it, it can cause unnecessary config validation * against Google storage even when it's not used at all. To perform the config validation * only when it is actually used, we use a supplier. - * + *

* See OmniDataSegmentKiller for how DataSegmentKillers are initialized. */ private final Supplier storage; - public GoogleStorage(Supplier storage) + private final HumanReadableBytes DEFAULT_WRITE_CHUNK_SIZE = new HumanReadableBytes("4MiB"); + + public GoogleStorage(final Supplier storage) { this.storage = storage; } public void insert(final String bucket, final String path, AbstractInputStreamContent mediaContent) throws IOException { - Storage.Objects.Insert insertObject = storage.get().objects().insert(bucket, null, mediaContent); - insertObject.setName(path); - insertObject.getMediaHttpUploader().setDirectUploadEnabled(false); - insertObject.execute(); + storage.get().createFrom(getBlobInfo(bucket, path), mediaContent.getInputStream()); } - public InputStream get(final String bucket, final String path) throws IOException + public InputStream getInputStream(final String bucket, final String path) throws IOException { - return get(bucket, path, 0); + return getInputStream(bucket, path, 0, null, null); } - public InputStream get(final String bucket, final String path, long start) throws IOException + public InputStream getInputStream( + final String bucket, + final String path, + long start + ) throws IOException { - final Get get = storage.get().objects().get(bucket, path); - InputStream inputStream = get.executeMediaAsInputStream(); - inputStream.skip(start); - return inputStream; + return getInputStream(bucket, path, start, null, null); } - public StorageObject getMetadata(final String bucket, final String path) throws IOException + public InputStream getInputStream( + final String bucket, + final String path, + long start, + Long length + ) throws IOException { - return storage.get().objects().get(bucket, path).execute(); + return getInputStream(bucket, path, start, length, null); } - public void delete(final String bucket, final String path) throws IOException + public InputStream getInputStream( + final String bucket, + final String path, + long start, + @Nullable Long length, + @Nullable final Integer chunkSize + ) + throws IOException { - storage.get().objects().delete(bucket, path).execute(); + ReadChannel reader = storage.get().reader(bucket, path); + reader.seek(start); + if (length != null) { + reader.limit(start + length); + } + if (chunkSize != null) { + reader.setChunkSize(chunkSize); + } + // Using default read buffer size (2 MB) + return Channels.newInputStream(reader); } - public boolean exists(final String bucket, final String path) + public OutputStream getObjectOutputStream( + final String bucket, + final String path, + @Nullable final Integer chunkSize + ) + { + WriteChannel writer = storage.get().writer(getBlobInfo(bucket, path)); + // Limit GCS internal write buffer memory to prevent OOM errors + writer.setChunkSize(chunkSize == null ? DEFAULT_WRITE_CHUNK_SIZE.getBytesInInt() : chunkSize); + + return Channels.newOutputStream(writer); + } + + public GoogleStorageObjectMetadata getMetadata( + final String bucket, + final String path + ) throws IOException + { + Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.values())); + if (blob == null) { + throw new IOE("Failed fetching google cloud storage object [bucket: %s, path: %s]", bucket, path); + } + return new GoogleStorageObjectMetadata( + blob.getBucket(), + blob.getName(), + blob.getSize(), + blob.getUpdateTimeOffsetDateTime() + .toEpochSecond() + ); + } + + public void delete(final String bucket, final String path) throws IOException { - try { - return storage.get().objects().get(bucket, path).executeUsingHead().isSuccessStatusCode(); + if (!storage.get().delete(bucket, path)) { + throw new IOE( + "Failed deleting google cloud storage object [bucket: %s path: %s]", + bucket, + path + ); } - catch (Exception e) { - return false; + } + + /** + * Deletes a list of objects in a bucket + * + * @param bucket GCS bucket + * @param paths Iterable for absolute paths of objects to be deleted inside the bucket + */ + public void batchDelete(final String bucket, final Iterable paths) throws IOException + { + List statuses = storage.get().delete(Iterables.transform(paths, input -> BlobId.of(bucket, input))); + if (statuses.contains(false)) { + throw new IOE("Failed deleting google cloud storage object(s)"); } } - + + public boolean exists(final String bucket, final String path) + { + Blob blob = storage.get().get(bucket, path); + return blob != null; + } + public long size(final String bucket, final String path) throws IOException { - return storage.get().objects().get(bucket, path).execute().getSize().longValue(); + Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE)); + if (blob == null) { + throw new IOE("Failed fetching google cloud storage object [bucket: %s, path: %s]", bucket, path); + } + return blob.getSize(); } public String version(final String bucket, final String path) throws IOException { - return storage.get().objects().get(bucket, path).execute().getEtag(); + Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.GENERATION)); + if (blob == null) { + throw new IOE("Failed fetching google cloud storage object [bucket: %s, path: %s]", bucket, path); + } + return blob.getGeneratedId(); } - public Storage.Objects.List list(final String bucket) throws IOException + /*** + * Provides a paged listing of objects for a given bucket and prefix + * @param bucket GCS bucket + * @param prefix Path prefix + * @param pageSize Number of objects per page + * @param pageToken Continuation token for the next page; use null for the first page + * or the nextPageToken from the previous {@link GoogleStorageObjectPage} + */ + public GoogleStorageObjectPage list( + final String bucket, + @Nullable final String prefix, + @Nullable final Long pageSize, + @Nullable final String pageToken + ) throws IOException { - return storage.get().objects().list(bucket); + List options = new ArrayList<>(); + + if (prefix != null) { + options.add(Storage.BlobListOption.prefix(prefix)); + } + + if (pageSize != null) { + options.add(Storage.BlobListOption.pageSize(pageSize)); + } + + if (pageToken != null) { + options.add(Storage.BlobListOption.pageToken(pageToken)); + } + + Page blobPage = storage.get().list(bucket, options.toArray(new Storage.BlobListOption[0])); + + if (blobPage == null) { + throw new IOE("Failed fetching google cloud storage object [bucket: %s, prefix: %s]", bucket, prefix); + } + + + List googleStorageObjectMetadataList = + blobPage.streamValues() + .map(blob -> new GoogleStorageObjectMetadata( + blob.getBucket(), + blob.getName(), + blob.getSize(), + blob.getUpdateTimeOffsetDateTime() + .toEpochSecond() + )) + .collect(Collectors.toList()); + + return new GoogleStorageObjectPage(googleStorageObjectMetadataList, blobPage.getNextPageToken()); + + } + + + private BlobInfo getBlobInfo(final String bucket, final String path) + { + BlobId blobId = BlobId.of(bucket, path); + return BlobInfo.newBuilder(blobId).build(); + } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java index cb90cb51fb81..0467906a6ca4 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java @@ -23,10 +23,8 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.services.storage.Storage; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Provider; @@ -86,6 +84,7 @@ public void configure(Binder binder) { LOG.info("Configuring GoogleStorageDruidModule..."); + JsonConfigProvider.bind(binder, "druid.google", GoogleInputDataConfig.class); JsonConfigProvider.bind(binder, "druid.google", GoogleAccountConfig.class); Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(GoogleDataSegmentPusher.class) @@ -104,16 +103,9 @@ public void configure(Binder binder) @Provides @LazySingleton - public Storage getGcpStorage( - HttpTransport httpTransport, - JsonFactory jsonFactory, - HttpRequestInitializer requestInitializer - ) + public Storage getGcpStorage() { - return new Storage - .Builder(httpTransport, jsonFactory, requestInitializer) - .setApplicationName(APPLICATION_NAME) - .build(); + return StorageOptions.getDefaultInstance().getService(); } /** diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectMetadata.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectMetadata.java new file mode 100644 index 000000000000..87feb774a5d8 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectMetadata.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import java.util.Objects; + +public class GoogleStorageObjectMetadata +{ + final String bucket; + final String name; + final Long size; + Long lastUpdateTime; + + public GoogleStorageObjectMetadata(final String bucket, final String name, final Long size, final Long lastUpdateTime) + { + this.bucket = bucket; + this.name = name; + this.size = size; + this.lastUpdateTime = lastUpdateTime; + } + + public void setLastUpdateTime(Long lastUpdateTime) + { + this.lastUpdateTime = lastUpdateTime; + } + + + public String getBucket() + { + return bucket; + } + + public String getName() + { + return name; + } + + public Long getSize() + { + return size; + } + + public Long getLastUpdateTime() + { + return lastUpdateTime; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GoogleStorageObjectMetadata that = (GoogleStorageObjectMetadata) o; + return Objects.equals(bucket, that.bucket) + && Objects.equals(name, that.name) + && Objects.equals(size, that.size); + } + + @Override + public int hashCode() + { + return Objects.hash(bucket, name, size); + } + + @Override + public String toString() + { + return "GoogleStorageObjectMetadata{" + + "bucket='" + bucket + '\'' + + ", name='" + name + '\'' + + ", size=" + size + + ", lastUpdateTime=" + lastUpdateTime + + '}'; + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectPage.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectPage.java new file mode 100644 index 000000000000..e58059125a15 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectPage.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import javax.annotation.Nullable; +import java.util.List; + +public class GoogleStorageObjectPage +{ + final List objectList; + + @Nullable + final String nextPageToken; + + public GoogleStorageObjectPage( + List objectList, + String nextPageToken + ) + { + this.objectList = objectList; + this.nextPageToken = nextPageToken; + } + + public List getObjectList() + { + return objectList; + } + + @Nullable + public String getNextPageToken() + { + return nextPageToken; + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java index ae4024172a6f..4f7444f8ea9e 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java @@ -204,7 +204,7 @@ public void killOlderThan(long timestamp) throws IOException inputDataConfig, config.getBucket(), config.getPrefix(), - (object) -> object.getUpdated().getValue() < timestamp + (object) -> object.getLastUpdateTime() < timestamp ); } catch (Exception e) { diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java index d1ed8a7ef6a2..b93128cc2fa4 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java @@ -19,8 +19,6 @@ package org.apache.druid.storage.google; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; import com.google.inject.Inject; import org.apache.druid.data.SearchableVersionedDataFinder; import org.apache.druid.data.input.impl.CloudObjectLocation; @@ -49,21 +47,27 @@ public URI getLatestVersion(URI descriptorBase, @Nullable Pattern pattern) long mostRecent = Long.MIN_VALUE; URI latest = null; final CloudObjectLocation baseLocation = new CloudObjectLocation(descriptorBase); - final Objects objects = storage.list(baseLocation.getBucket()).setPrefix(baseLocation.getPath()).setMaxResults(MAX_LISTING_KEYS).execute(); - for (StorageObject storageObject : objects.getItems()) { - if (GoogleUtils.isDirectoryPlaceholder(storageObject)) { + final GoogleStorageObjectPage googleStorageObjectPage = storage.list( + baseLocation.getBucket(), + baseLocation.getPath(), + MAX_LISTING_KEYS, + null + ); + for (GoogleStorageObjectMetadata objectMetadata : googleStorageObjectPage.getObjectList()) { + if (GoogleUtils.isDirectoryPlaceholder(objectMetadata)) { continue; } // remove path prefix from file name - final CloudObjectLocation objectLocation = new CloudObjectLocation(storageObject.getBucket(), - storageObject.getName() + final CloudObjectLocation objectLocation = new CloudObjectLocation( + objectMetadata.getBucket(), + objectMetadata.getName() ); final String keyString = StringUtils - .maybeRemoveLeadingSlash(storageObject.getName().substring(baseLocation.getPath().length())); + .maybeRemoveLeadingSlash(objectMetadata.getName().substring(baseLocation.getPath().length())); if (pattern != null && !pattern.matcher(keyString).matches()) { continue; } - final long latestModified = storageObject.getUpdated().getValue(); + final long latestModified = objectMetadata.getLastUpdateTime(); if (latestModified >= mostRecent) { mostRecent = latestModified; latest = objectLocation.toUri(GoogleStorageDruidModule.SCHEME_GS); @@ -72,7 +76,7 @@ public URI getLatestVersion(URI descriptorBase, @Nullable Pattern pattern) return latest; } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeException(); } } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index 25b4f3286ea7..a819442ef351 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -20,7 +20,6 @@ package org.apache.druid.storage.google; import com.google.api.client.http.HttpResponseException; -import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.impl.CloudObjectLocation; @@ -45,22 +44,22 @@ public static boolean isRetryable(Throwable t) return t instanceof IOException; } - static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception + public static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception { return RetryUtils.retry(f, GOOGLE_RETRY, RetryUtils.DEFAULT_MAX_TRIES); } - public static URI objectToUri(StorageObject object) + public static URI objectToUri(GoogleStorageObjectMetadata object) { return objectToCloudObjectLocation(object).toUri(GoogleStorageDruidModule.SCHEME_GS); } - public static CloudObjectLocation objectToCloudObjectLocation(StorageObject object) + public static CloudObjectLocation objectToCloudObjectLocation(GoogleStorageObjectMetadata object) { return new CloudObjectLocation(object.getBucket(), object.getName()); } - public static Iterator lazyFetchingStorageObjectsIterator( + public static Iterator lazyFetchingStorageObjectsIterator( final GoogleStorage storage, final Iterator uris, final long maxListingLength @@ -85,18 +84,18 @@ public static void deleteObjectsInPath( GoogleInputDataConfig config, String bucket, String prefix, - Predicate filter + Predicate filter ) throws Exception { - final Iterator iterator = lazyFetchingStorageObjectsIterator( + final Iterator iterator = lazyFetchingStorageObjectsIterator( storage, ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(), config.getMaxListingLength() ); while (iterator.hasNext()) { - final StorageObject nextObject = iterator.next(); + final GoogleStorageObjectMetadata nextObject = iterator.next(); if (filter.apply(nextObject)) { retryGoogleCloudStorageOperation(() -> { storage.delete(nextObject.getBucket(), nextObject.getName()); @@ -110,13 +109,13 @@ public static void deleteObjectsInPath( * Similar to {@link org.apache.druid.storage.s3.ObjectSummaryIterator#isDirectoryPlaceholder} * Copied to avoid creating dependency on s3 extensions */ - public static boolean isDirectoryPlaceholder(final StorageObject storageObject) + public static boolean isDirectoryPlaceholder(final GoogleStorageObjectMetadata objectMetadata) { // Recognize "standard" directory place-holder indications - if (storageObject.getName().endsWith("/") && storageObject.getSize().intValue() == 0) { + if (objectMetadata.getName().endsWith("/") && objectMetadata.getSize().intValue() == 0) { return true; } // Recognize place-holder objects created by the Google Storage console or S3 Organizer Firefox extension. - return storageObject.getName().endsWith("_$folder$") && storageObject.getSize().intValue() == 0; + return objectMetadata.getName().endsWith("_$folder$") && objectMetadata.getSize().intValue() == 0; } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java index 10275112f6f4..b1ad9871776d 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java @@ -19,9 +19,6 @@ package org.apache.druid.storage.google; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; import org.apache.druid.java.util.common.StringUtils; import java.io.IOException; @@ -29,61 +26,48 @@ import java.util.Iterator; import java.util.NoSuchElementException; -public class ObjectStorageIterator implements Iterator +public class ObjectStorageIterator implements Iterator { private final GoogleStorage storage; private final Iterator uris; private final long maxListingLength; - - private Storage.Objects.List listRequest; - private Objects results; + private GoogleStorageObjectPage googleStorageObjectPage; private URI currentUri; private String nextPageToken; - private Iterator storageObjectsIterator; - private StorageObject currentObject; + private Iterator blobIterator; + private GoogleStorageObjectMetadata currentObject; public ObjectStorageIterator(GoogleStorage storage, Iterator uris, long maxListingLength) { this.storage = storage; this.uris = uris; this.maxListingLength = maxListingLength; - this.nextPageToken = null; - prepareNextRequest(); - fetchNextBatch(); + advanceURI(); + fetchNextPage(); advanceStorageObject(); } - private void prepareNextRequest() + + private void advanceURI() + { + currentUri = uris.next(); + } + + private void fetchNextPage() { try { - currentUri = uris.next(); String currentBucket = currentUri.getAuthority(); String currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath()); - nextPageToken = null; - listRequest = storage.list(currentBucket) - .setPrefix(currentPrefix) - .setMaxResults(maxListingLength); - + googleStorageObjectPage = storage.list(currentBucket, currentPrefix, maxListingLength, nextPageToken); + blobIterator = googleStorageObjectPage.getObjectList().iterator(); + nextPageToken = googleStorageObjectPage.getNextPageToken(); } catch (IOException io) { throw new RuntimeException(io); } } - private void fetchNextBatch() - { - try { - listRequest.setPageToken(nextPageToken); - results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute()); - storageObjectsIterator = results.getItems().iterator(); - nextPageToken = results.getNextPageToken(); - } - catch (Exception ex) { - throw new RuntimeException(ex); - } - } - @Override public boolean hasNext() { @@ -91,35 +75,35 @@ public boolean hasNext() } @Override - public StorageObject next() + public GoogleStorageObjectMetadata next() { if (!hasNext()) { throw new NoSuchElementException(); } - final StorageObject retVal = currentObject; + final GoogleStorageObjectMetadata retVal = currentObject; advanceStorageObject(); return retVal; } private void advanceStorageObject() { - while (storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext()) { - while (storageObjectsIterator.hasNext()) { - final StorageObject next = storageObjectsIterator.next(); + while (blobIterator.hasNext() || nextPageToken != null || uris.hasNext()) { + while (blobIterator.hasNext()) { + final GoogleStorageObjectMetadata next = blobIterator.next(); // list with prefix can return directories, but they should always end with `/`, ignore them. // also skips empty objects. - if (!next.getName().endsWith("/") && next.getSize().signum() > 0) { + if (!next.getName().endsWith("/") && Long.signum(next.getSize()) > 0) { currentObject = next; return; } } if (nextPageToken != null) { - fetchNextBatch(); + fetchNextPage(); } else if (uris.hasNext()) { - prepareNextRequest(); - fetchNextBatch(); + advanceURI(); + fetchNextPage(); } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java new file mode 100644 index 000000000000..a3d1c863a755 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import java.util.Objects; + +public class GoogleInputRange +{ + private final long start; + private final long size; + private final String bucket; + private final String path; + + public GoogleInputRange(long start, long size, String bucket, String path) + { + this.start = start; + this.size = size; + this.bucket = bucket; + this.path = path; + } + + public long getStart() + { + return start; + } + + public long getSize() + { + return size; + } + + public String getBucket() + { + return bucket; + } + + public String getPath() + { + return path; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GoogleInputRange that = (GoogleInputRange) o; + return start == that.start + && size == that.size + && Objects.equals(bucket, that.bucket) + && Objects.equals(path, that.path); + } + + @Override + public int hashCode() + { + return Objects.hash(start, size, bucket, path); + } + + @Override + public String toString() + { + return "GoogleInputRange{" + + "start=" + start + + ", size=" + size + + ", bucket='" + bucket + '\'' + + ", path='" + path + '\'' + + '}'; + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java new file mode 100644 index 000000000000..c9c78151ae99 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.RetryUtils; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Objects; + +public class GoogleOutputConfig +{ + + @JsonProperty + private final String bucket; + + @JsonProperty + private final String prefix; + + @JsonProperty + private final File tempDir; + + @JsonProperty + private HumanReadableBytes chunkSize; + + private static final HumanReadableBytes DEFAULT_CHUNK_SIZE = new HumanReadableBytes("4MiB"); + + // GCS imposed minimum chunk size + private static final long GOOGLE_MIN_CHUNK_SIZE_BYTES = new HumanReadableBytes("256KiB").getBytes(); + + // Self-imposed max chunk size since this size is allocated per open file consuming significant memory. + private static final long GOOGLE_MAX_CHUNK_SIZE_BYTES = new HumanReadableBytes("16MiB").getBytes(); + + + @JsonProperty + private int maxRetry; + + public GoogleOutputConfig( + final String bucket, + final String prefix, + final File tempDir, + @Nullable final HumanReadableBytes chunkSize, + @Nullable final Integer maxRetry + ) + { + this.bucket = bucket; + this.prefix = prefix; + this.tempDir = tempDir; + this.chunkSize = chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE; + this.maxRetry = maxRetry != null ? maxRetry : RetryUtils.DEFAULT_MAX_TRIES; + + validateFields(); + } + + public String getBucket() + { + return bucket; + } + + public String getPrefix() + { + return prefix; + } + + public File getTempDir() + { + return tempDir; + } + + public HumanReadableBytes getChunkSize() + { + return chunkSize; + } + + public Integer getMaxRetry() + { + return maxRetry; + } + + private void validateFields() + { + if (chunkSize.getBytes() < GOOGLE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > GOOGLE_MAX_CHUNK_SIZE_BYTES) { + throw InvalidInput.exception( + "'chunkSize' [%d] bytes to the GoogleConfig should be between [%d] bytes and [%d] bytes", + chunkSize.getBytes(), + GOOGLE_MIN_CHUNK_SIZE_BYTES, + GOOGLE_MAX_CHUNK_SIZE_BYTES + ); + } + } + + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GoogleOutputConfig that = (GoogleOutputConfig) o; + return Objects.equals(bucket, that.bucket) + && Objects.equals(prefix, that.prefix) + && Objects.equals(tempDir, that.tempDir) + && Objects.equals(chunkSize, that.chunkSize) + && Objects.equals(maxRetry, that.maxRetry); + } + + @Override + public int hashCode() + { + return Objects.hash(bucket, prefix, tempDir, chunkSize, maxRetry); + } + + @Override + public String toString() + { + return "GoogleOutputConfig{" + + "container='" + bucket + '\'' + + ", prefix='" + prefix + '\'' + + ", tempDir=" + tempDir + + ", chunkSize=" + chunkSize + + ", maxRetry=" + maxRetry + + '}'; + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java new file mode 100644 index 000000000000..6edbad3beaf1 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.storage.google.GoogleInputDataConfig; +import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageDruidModule; +import org.apache.druid.storage.google.GoogleStorageObjectMetadata; +import org.apache.druid.storage.google.GoogleUtils; +import org.apache.druid.storage.remote.ChunkingStorageConnector; +import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Iterator; + +public class GoogleStorageConnector extends ChunkingStorageConnector +{ + + private static final String DELIM = "/"; + private static final Joiner JOINER = Joiner.on(DELIM).skipNulls(); + private static final Logger log = new Logger(GoogleStorageConnector.class); + + private final GoogleStorage storage; + private final GoogleOutputConfig config; + private final GoogleInputDataConfig inputDataConfig; + + public GoogleStorageConnector( + GoogleOutputConfig config, + GoogleStorage googleStorage, + GoogleInputDataConfig inputDataConfig + ) + { + this.storage = googleStorage; + this.config = config; + this.inputDataConfig = inputDataConfig; + + Preconditions.checkNotNull(config, "config is null"); + Preconditions.checkNotNull(config.getTempDir(), "tempDir is null in google config"); + + try { + FileUtils.mkdirp(config.getTempDir()); + } + catch (IOException e) { + throw new RE( + e, + StringUtils.format("Cannot create tempDir : [%s] for google storage connector", config.getTempDir()) + ); + } + } + + @Override + public boolean pathExists(String path) + { + return storage.exists(config.getBucket(), objectPath(path)); + } + + @Override + public OutputStream write(String path) + { + return storage.getObjectOutputStream(config.getBucket(), objectPath(path), config.getChunkSize().getBytesInInt()); + } + + @Override + public void deleteFile(String path) throws IOException + { + try { + final String fullPath = objectPath(path); + log.debug("Deleting file at bucket: [%s], path: [%s]", config.getBucket(), fullPath); + + GoogleUtils.retryGoogleCloudStorageOperation( + () -> { + storage.delete(config.getBucket(), fullPath); + return null; + } + ); + } + catch (Exception e) { + log.error("Error occurred while deleting file at path [%s]. Error: [%s]", path, e.getMessage()); + throw new IOException(e); + } + } + + @Override + public void deleteFiles(Iterable paths) throws IOException + { + storage.batchDelete(config.getBucket(), Iterables.transform(paths, this::objectPath)); + } + + @Override + public void deleteRecursively(String path) throws IOException + { + final String fullPath = objectPath(path); + Iterator storageObjects = GoogleUtils.lazyFetchingStorageObjectsIterator( + storage, + ImmutableList.of(new CloudObjectLocation(config.getBucket(), fullPath) + .toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(), + inputDataConfig.getMaxListingLength() + ); + + storage.batchDelete( + config.getBucket(), + () -> Iterators.transform(storageObjects, GoogleStorageObjectMetadata::getName) + ); + } + + @Override + public Iterator listDir(String dirName) + { + final String fullPath = objectPath(dirName); + Iterator storageObjects = GoogleUtils.lazyFetchingStorageObjectsIterator( + storage, + ImmutableList.of(new CloudObjectLocation(config.getBucket(), fullPath) + .toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(), + inputDataConfig.getMaxListingLength() + ); + + return Iterators.transform( + storageObjects, + storageObject -> { + String[] split = storageObject.getName().split(fullPath, 2); + if (split.length > 1) { + return split[1]; + } else { + return ""; + } + } + ); + } + + @Override + public ChunkingStorageConnectorParameters buildInputParams(String path) throws IOException + { + long size = storage.size(config.getBucket(), objectPath(path)); + return buildInputParams(path, 0, size); + } + + @Override + public ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size) + { + ChunkingStorageConnectorParameters.Builder builder = new ChunkingStorageConnectorParameters.Builder<>(); + builder.start(from); + builder.end(from + size); + builder.cloudStoragePath(objectPath(path)); + builder.tempDirSupplier(config::getTempDir); + builder.maxRetry(config.getMaxRetry()); + builder.retryCondition(GoogleUtils.GOOGLE_RETRY); + builder.objectSupplier(((start, end) -> new GoogleInputRange( + start, + end - start, + config.getBucket(), + objectPath(path) + ))); + builder.objectOpenFunction(new ObjectOpenFunction() + { + @Override + public InputStream open(GoogleInputRange googleInputRange) throws IOException + { + return storage.getInputStream( + googleInputRange.getBucket(), + googleInputRange.getPath(), + googleInputRange.getStart(), + googleInputRange.getSize() + ); + } + + @Override + public InputStream open(GoogleInputRange googleInputRange, long offset) throws IOException + { + long rangeStart = googleInputRange.getStart() + offset; + return storage.getInputStream( + googleInputRange.getBucket(), + googleInputRange.getPath(), + rangeStart, + googleInputRange.getSize() + ); + } + }); + + return builder.build(); + } + + private String objectPath(String path) + { + return JOINER.join(config.getPrefix(), path); + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java new file mode 100644 index 000000000000..cba33b5804c0 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import org.apache.druid.initialization.DruidModule; + +import java.util.Collections; +import java.util.List; + +public class GoogleStorageConnectorModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return Collections.singletonList( + new SimpleModule(this.getClass().getSimpleName()).registerSubtypes(GoogleStorageConnectorProvider.class)); + } + + @Override + public void configure(Binder binder) + { + + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java new file mode 100644 index 000000000000..f33a3b1f44db --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; +import org.apache.druid.storage.google.GoogleInputDataConfig; +import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageDruidModule; + +import javax.annotation.Nullable; +import java.io.File; + +@JsonTypeName(GoogleStorageDruidModule.SCHEME) +public class GoogleStorageConnectorProvider extends GoogleOutputConfig implements StorageConnectorProvider +{ + + @JacksonInject + GoogleStorage googleStorage; + + @JacksonInject + GoogleInputDataConfig googleInputDataConfig; + + @JsonCreator + public GoogleStorageConnectorProvider( + @JsonProperty(value = "bucket", required = true) String bucket, + @JsonProperty(value = "prefix", required = true) String prefix, + @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize, + @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry + ) + { + super(bucket, prefix, tempDir, chunkSize, maxRetry); + } + + @Override + public StorageConnector get() + { + return new GoogleStorageConnector(this, googleStorage, googleInputDataConfig); + } + +} diff --git a/extensions-core/google-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-core/google-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule index 3d89b7320313..92cb05897cc3 100644 --- a/extensions-core/google-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule +++ b/extensions-core/google-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.druid.storage.google.GoogleStorageDruidModule +org.apache.druid.storage.google.output.GoogleStorageConnectorModule +org.apache.druid.storage.google.GoogleStorageDruidModule \ No newline at end of file diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index 556eb840ea9f..ae968aa3d6fa 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -23,9 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.module.guice.ObjectMapperModule; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Guice; @@ -56,6 +53,8 @@ import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.storage.google.GoogleInputDataConfig; import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageObjectMetadata; +import org.apache.druid.storage.google.GoogleStorageObjectPage; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.utils.CompressionUtils; import org.easymock.EasyMock; @@ -68,7 +67,6 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.math.BigInteger; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -114,6 +112,10 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe private static final byte[] CONTENT = StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis())); + private static final String BUCKET = "TEST_BUCKET"; + private static final String OBJECT_NAME = "TEST_NAME"; + private static final Long UPDATE_TIME = 111L; + @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -207,21 +209,31 @@ public void testGetTypes() } @Test - public void testWithUrisSplit() throws Exception + public void testWithUrisSplit() throws IOException { EasyMock.reset(STORAGE); + + GoogleStorageObjectMetadata objectMetadata = new GoogleStorageObjectMetadata( + BUCKET, + OBJECT_NAME, + (long) CONTENT.length, + UPDATE_TIME + ); + EasyMock.expect( STORAGE.getMetadata( EXPECTED_URIS.get(0).getAuthority(), StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(0).getPath()) ) - ).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length))); + ).andReturn(objectMetadata); + EasyMock.expect( STORAGE.getMetadata( EXPECTED_URIS.get(1).getAuthority(), StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(1).getPath()) ) - ).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length))); + ).andReturn(objectMetadata); + EasyMock.replay(STORAGE); GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource( @@ -243,21 +255,28 @@ public void testWithUrisSplit() throws Exception } @Test - public void testWithUrisGlob() throws Exception + public void testWithUrisGlob() throws IOException { + GoogleStorageObjectMetadata objectMetadata = new GoogleStorageObjectMetadata( + BUCKET, + OBJECT_NAME, + (long) CONTENT.length, + UPDATE_TIME + ); + EasyMock.reset(STORAGE); EasyMock.expect( STORAGE.getMetadata( EXPECTED_URIS.get(0).getAuthority(), StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(0).getPath()) ) - ).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length))); + ).andReturn(objectMetadata); EasyMock.expect( STORAGE.getMetadata( EXPECTED_URIS.get(1).getAuthority(), StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(1).getPath()) ) - ).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length))); + ).andReturn(objectMetadata); EasyMock.replay(STORAGE); GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource( STORAGE, @@ -488,28 +507,30 @@ private static void addExpectedPrefixObjects(URI prefix, List uris) throws { final String bucket = prefix.getAuthority(); - Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class); - EasyMock.expect(STORAGE.list(EasyMock.eq(bucket))).andReturn(listRequest).once(); - EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).once(); - EasyMock.expect(listRequest.setMaxResults((long) MAX_LISTING_LENGTH)).andReturn(listRequest).once(); - EasyMock.expect(listRequest.setPrefix(EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath())))) - .andReturn(listRequest) - .once(); + GoogleStorageObjectPage response = EasyMock.createMock(GoogleStorageObjectPage.class); - List mockObjects = new ArrayList<>(); + List mockObjects = new ArrayList<>(); for (URI uri : uris) { - StorageObject s = new StorageObject(); - s.setBucket(bucket); - s.setName(uri.getPath()); - s.setSize(BigInteger.valueOf(CONTENT.length)); + GoogleStorageObjectMetadata s = new GoogleStorageObjectMetadata( + bucket, + uri.getPath(), + (long) CONTENT.length, + UPDATE_TIME + ); mockObjects.add(s); } - Objects response = new Objects(); - response.setItems(mockObjects); - EasyMock.expect(listRequest.execute()).andReturn(response).once(); - EasyMock.expect(response.getItems()).andReturn(mockObjects).once(); - EasyMock.replay(listRequest); + EasyMock.expect(STORAGE.list( + EasyMock.eq(bucket), + EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath())), + EasyMock.eq((long) MAX_LISTING_LENGTH), + EasyMock.eq(null) + )).andReturn(response).once(); + + EasyMock.expect(response.getObjectList()).andReturn(mockObjects).once(); + EasyMock.expect(response.getNextPageToken()).andReturn(null).once(); + + EasyMock.replay(response); } private static void addExpectedGetObjectMock(URI uri) throws IOException @@ -517,7 +538,7 @@ private static void addExpectedGetObjectMock(URI uri) throws IOException CloudObjectLocation location = new CloudObjectLocation(uri); EasyMock.expect( - STORAGE.get(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L)) + STORAGE.getInputStream(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L)) ).andReturn(new ByteArrayInputStream(CONTENT)).once(); } @@ -529,7 +550,7 @@ private static void addExpectedGetCompressedObjectMock(URI uri) throws IOExcepti CompressionUtils.gzip(new ByteArrayInputStream(CONTENT), gzipped); EasyMock.expect( - STORAGE.get(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L)) + STORAGE.getInputStream(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L)) ).andReturn(new ByteArrayInputStream(gzipped.toByteArray())).once(); } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleByteSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleByteSourceTest.java index a65f4750d3b6..db5c0e724bd3 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleByteSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleByteSourceTest.java @@ -36,7 +36,7 @@ public void openStreamTest() throws IOException GoogleStorage storage = createMock(GoogleStorage.class); InputStream stream = createMock(InputStream.class); - EasyMock.expect(storage.get(bucket, path)).andReturn(stream); + EasyMock.expect(storage.getInputStream(bucket, path)).andReturn(stream); replayAll(); @@ -54,7 +54,7 @@ public void openStreamWithRecoverableErrorTest() throws IOException final String path = "/path/to/file"; GoogleStorage storage = createMock(GoogleStorage.class); - EasyMock.expect(storage.get(bucket, path)).andThrow(new IOException("")); + EasyMock.expect(storage.getInputStream(bucket, path)).andThrow(new IOException("")); replayAll(); diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java index 97a0e61dce3f..8d9612f4d8d9 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java @@ -24,8 +24,6 @@ import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpResponseException; import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.ISE; @@ -169,17 +167,10 @@ public void test_killAll_accountConfigWithNullBucketAndPrefix_throwsISEException @Test public void test_killAll_noException_deletesAllTaskLogs() throws IOException { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1); - Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1, object2) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1, object2)); GoogleTestUtils.expectDeleteObjects( storage, @@ -190,29 +181,22 @@ public void test_killAll_noException_deletesAllTaskLogs() throws IOException EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes(); EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage); + EasyMock.replay(accountConfig, inputDataConfig, storage); GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); killer.killAll(); - EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage); + EasyMock.verify(accountConfig, inputDataConfig, storage); } @Test public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1)); GoogleTestUtils.expectDeleteObjects( storage, @@ -224,30 +208,22 @@ public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskL EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes(); EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage); + EasyMock.replay(accountConfig, inputDataConfig, storage); GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); killer.killAll(); - EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage); + EasyMock.verify(accountConfig, inputDataConfig, storage); } @Test public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() { boolean ioExceptionThrown = false; - Storage.Objects.List listRequest = null; try { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - - listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1)); GoogleTestUtils.expectDeleteObjects( storage, @@ -259,7 +235,7 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteA EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes(); EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage); + EasyMock.replay(accountConfig, inputDataConfig, storage); GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); killer.killAll(); @@ -270,6 +246,6 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteA Assert.assertTrue(ioExceptionThrown); - EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage); + EasyMock.verify(accountConfig, inputDataConfig, storage); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPullerTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPullerTest.java index deb2383fd6c9..059432ba8188 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPullerTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPullerTest.java @@ -52,7 +52,7 @@ public void testDeleteOutputDirectoryWhenErrorIsRaisedPullingSegmentFiles() 300, "test" ); - EasyMock.expect(storage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andThrow(exception); + EasyMock.expect(storage.getInputStream(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andThrow(exception); replayAll(); @@ -93,7 +93,7 @@ public void testGetInputStreamBucketNameWithUnderscores() throws IOException String prefix = "prefix/"; GoogleStorage storage = createMock(GoogleStorage.class); - EasyMock.expect(storage.get(EasyMock.eq(bucket), EasyMock.eq(prefix))).andReturn(EasyMock.createMock(InputStream.class)); + EasyMock.expect(storage.getInputStream(EasyMock.eq(bucket), EasyMock.eq(prefix))).andReturn(EasyMock.createMock(InputStream.class)); EasyMock.replay(storage); GoogleDataSegmentPuller puller = new GoogleDataSegmentPuller(storage); diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java index 9ca384cf4682..e45877c89f4d 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java @@ -19,14 +19,9 @@ package org.apache.druid.storage.google; -import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.services.storage.Storage; +import com.google.cloud.storage.Storage; import com.google.common.collect.ImmutableList; import com.google.inject.Injector; -import org.apache.druid.common.gcp.GcpMockModule; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.segment.loading.OmniDataSegmentKiller; import org.junit.Assert; @@ -42,23 +37,7 @@ public void testSegmentKillerBoundedSingleton() // HttpRquestInitializer, the test throws an exception from that method, meaning that if they are not loaded // lazily, the exception should end up thrown. // 2. That the same object is returned. - Injector injector = GuiceInjectors.makeStartupInjectorWithModules( - ImmutableList.of( - new GcpMockModule() - { - - @Override - public HttpRequestInitializer mockRequestInitializer( - HttpTransport transport, - JsonFactory factory - ) - { - return new MockGoogleCredential.Builder().setTransport(transport).setJsonFactory(factory).build(); - } - }, - new GoogleStorageDruidModule() - ) - ); + Injector injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(new GoogleStorageDruidModule())); OmniDataSegmentKiller killer = injector.getInstance(OmniDataSegmentKiller.class); Assert.assertTrue(killer.getKillers().containsKey(GoogleStorageDruidModule.SCHEME)); Assert.assertSame( @@ -78,23 +57,7 @@ public void testLazyInstantiation() // HttpRquestInitializer, the test throws an exception from that method, meaning that if they are not loaded // lazily, the exception should end up thrown. // 2. That the same object is returned. - Injector injector = GuiceInjectors.makeStartupInjectorWithModules( - ImmutableList.of( - new GcpMockModule() - { - - @Override - public HttpRequestInitializer mockRequestInitializer( - HttpTransport transport, - JsonFactory factory - ) - { - throw new UnsupportedOperationException("should not be called, because this should be lazy"); - } - }, - new GoogleStorageDruidModule() - ) - ); + Injector injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(new GoogleStorageDruidModule())); final GoogleStorage instance = injector.getInstance(GoogleStorage.class); Assert.assertSame(instance, injector.getInstance(GoogleStorage.class)); } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java index 848cf97fed17..d92339f53c79 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java @@ -19,73 +19,243 @@ package org.apache.druid.storage.google; -import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential; -import com.google.api.client.http.ByteArrayContent; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.client.testing.http.MockLowLevelHttpRequest; -import com.google.api.client.testing.http.MockLowLevelHttpResponse; -import com.google.api.services.storage.Storage; -import com.google.common.base.Suppliers; -import org.apache.druid.java.util.common.StringUtils; -import org.junit.Assert; +import com.google.api.gax.paging.Page; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import com.google.common.collect.ImmutableList; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.io.InputStream; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class GoogleStorageTest { + Storage mockStorage; + GoogleStorage googleStorage; + + Blob blob; + + static final String BUCKET = "bucket"; + static final String PATH = "/path"; + static final long SIZE = 100; + static final OffsetDateTime UPDATE_TIME = OffsetDateTime.MIN; + + @Before + public void setUp() + { + mockStorage = EasyMock.mock(Storage.class); + + googleStorage = new GoogleStorage(() -> mockStorage); + + blob = EasyMock.mock(Blob.class); + } + + @Test + public void testDeleteSuccess() throws IOException + { + EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(true); + EasyMock.replay(mockStorage); + googleStorage.delete(BUCKET, PATH); + } + + @Test + public void testDeleteFailure() + { + EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(false); + EasyMock.replay(mockStorage); + boolean thrownIOException = false; + try { + googleStorage.delete(BUCKET, PATH); + + } + catch (IOException e) { + thrownIOException = true; + } + assertTrue(thrownIOException); + } + @Test - public void testGet() throws IOException + public void testBatchDeleteSuccess() throws IOException { - String content = "abcdefghij"; - MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); - response.setContent(content); - GoogleStorage googleStorage = makeGoogleStorage(response); - InputStream is = googleStorage.get("bucket", "path"); - String actual = GoogleTestUtils.readAsString(is); - Assert.assertEquals(content, actual); + List paths = ImmutableList.of("/path1", "/path2"); + final Capture> pathIterable = Capture.newInstance(); + EasyMock.expect(mockStorage.delete(EasyMock.capture(pathIterable))).andReturn(ImmutableList.of(true, true)); + EasyMock.replay(mockStorage); + + googleStorage.batchDelete(BUCKET, paths); + + List recordedBlobIds = new ArrayList<>(); + pathIterable.getValue().iterator().forEachRemaining(recordedBlobIds::add); + + List recordedPaths = recordedBlobIds.stream().map(BlobId::getName).collect(Collectors.toList()); + + assertTrue(paths.size() == recordedPaths.size() && paths.containsAll(recordedPaths) && recordedPaths.containsAll( + paths)); + assertEquals(BUCKET, recordedBlobIds.get(0).getBucket()); } @Test - public void testGetWithOffset() throws IOException + public void testBatchDeleteFailure() { - String content = "abcdefghij"; - MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); - response.setContent(content); - GoogleStorage googleStorage = makeGoogleStorage(response); - InputStream is = googleStorage.get("bucket", "path", 2); - String actual = GoogleTestUtils.readAsString(is); - Assert.assertEquals(content.substring(2), actual); + List paths = ImmutableList.of("/path1", "/path2"); + EasyMock.expect(mockStorage.delete((Iterable) EasyMock.anyObject())) + .andReturn(ImmutableList.of(false, true)); + EasyMock.replay(mockStorage); + boolean thrownIOException = false; + try { + googleStorage.batchDelete(BUCKET, paths); + + } + catch (IOException e) { + thrownIOException = true; + } + assertTrue(thrownIOException); } @Test - public void testInsert() throws IOException + public void testGetMetadata() throws IOException { - String content = "abcdefghij"; - MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); - response.addHeader("Location", "http://random-path"); - response.setContent("{}"); - MockHttpTransport transport = new MockHttpTransport.Builder().setLowLevelHttpResponse(response).build(); - GoogleStorage googleStorage = makeGoogleStorage(transport); - googleStorage.insert("bucket", "path", new ByteArrayContent("text/html", StringUtils.toUtf8(content))); - MockLowLevelHttpRequest request = transport.getLowLevelHttpRequest(); - String actual = request.getContentAsString(); - Assert.assertEquals(content, actual); + EasyMock.expect(mockStorage.get( + EasyMock.eq(BUCKET), + EasyMock.eq(PATH), + EasyMock.anyObject(Storage.BlobGetOption.class) + )).andReturn(blob); + + EasyMock.expect(blob.getBucket()).andReturn(BUCKET); + EasyMock.expect(blob.getName()).andReturn(PATH); + EasyMock.expect(blob.getSize()).andReturn(SIZE); + EasyMock.expect(blob.getUpdateTimeOffsetDateTime()).andReturn(UPDATE_TIME); + + EasyMock.replay(mockStorage, blob); + + GoogleStorageObjectMetadata objectMetadata = googleStorage.getMetadata(BUCKET, PATH); + assertEquals(objectMetadata, new GoogleStorageObjectMetadata(BUCKET, PATH, SIZE, UPDATE_TIME.toEpochSecond())); + } - private GoogleStorage makeGoogleStorage(MockLowLevelHttpResponse response) + @Test + public void testExistsTrue() { - MockHttpTransport transport = new MockHttpTransport.Builder().setLowLevelHttpResponse(response).build(); - return makeGoogleStorage(transport); + EasyMock.expect(mockStorage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(blob); + EasyMock.replay(mockStorage); + assertTrue(googleStorage.exists(BUCKET, PATH)); } - private GoogleStorage makeGoogleStorage(MockHttpTransport transport) + @Test + public void testExistsFalse() { - HttpRequestInitializer initializer = new MockGoogleCredential.Builder().build(); - Storage storage = new Storage(transport, JacksonFactory.getDefaultInstance(), initializer); - return new GoogleStorage(Suppliers.ofInstance(storage)); + EasyMock.expect(mockStorage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(null); + EasyMock.replay(mockStorage); + assertFalse(googleStorage.exists(BUCKET, PATH)); + } + + @Test + public void testSize() throws IOException + { + EasyMock.expect(mockStorage.get( + EasyMock.eq(BUCKET), + EasyMock.eq(PATH), + EasyMock.anyObject(Storage.BlobGetOption.class) + )).andReturn(blob); + + EasyMock.expect(blob.getSize()).andReturn(SIZE); + + EasyMock.replay(mockStorage, blob); + + long size = googleStorage.size(BUCKET, PATH); + + assertEquals(size, SIZE); + } + + @Test + public void testVersion() throws IOException + { + final String version = "7"; + EasyMock.expect(mockStorage.get( + EasyMock.eq(BUCKET), + EasyMock.eq(PATH), + EasyMock.anyObject(Storage.BlobGetOption.class) + )).andReturn(blob); + + EasyMock.expect(blob.getGeneratedId()).andReturn(version); + + EasyMock.replay(mockStorage, blob); + + assertEquals(version, googleStorage.version(BUCKET, PATH)); + } + + @Test + public void testList() throws IOException + { + Page blobPage = EasyMock.mock(Page.class); + EasyMock.expect(mockStorage.list( + EasyMock.eq(BUCKET), + EasyMock.anyObject() + )).andReturn(blobPage); + + Blob blob1 = EasyMock.mock(Blob.class); + Blob blob2 = EasyMock.mock(Blob.class); + + final String bucket1 = "BUCKET_1"; + final String path1 = "PATH_1"; + final long size1 = 7; + final OffsetDateTime updateTime1 = OffsetDateTime.MIN; + + final String bucket2 = "BUCKET_2"; + final String path2 = "PATH_2"; + final long size2 = 9; + final OffsetDateTime updateTime2 = OffsetDateTime.MIN; + + final String nextPageToken = "TOKEN"; + + EasyMock.expect(blob1.getBucket()).andReturn(bucket1); + EasyMock.expect(blob1.getName()).andReturn(path1); + EasyMock.expect(blob1.getSize()).andReturn(size1); + EasyMock.expect(blob1.getUpdateTimeOffsetDateTime()).andReturn(updateTime1); + + EasyMock.expect(blob2.getBucket()).andReturn(bucket2); + EasyMock.expect(blob2.getName()).andReturn(path2); + EasyMock.expect(blob2.getSize()).andReturn(size2); + EasyMock.expect(blob2.getUpdateTimeOffsetDateTime()).andReturn(updateTime2); + + + List blobs = ImmutableList.of(blob1, blob2); + + EasyMock.expect(blobPage.streamValues()).andReturn(blobs.stream()); + + EasyMock.expect(blobPage.getNextPageToken()).andReturn(nextPageToken); + + + EasyMock.replay(mockStorage, blobPage, blob1, blob2); + + GoogleStorageObjectMetadata objectMetadata1 = new GoogleStorageObjectMetadata( + bucket1, + path1, + size1, + updateTime1.toEpochSecond() + ); + GoogleStorageObjectMetadata objectMetadata2 = new GoogleStorageObjectMetadata( + bucket2, + path2, + size2, + updateTime2.toEpochSecond() + ); + + GoogleStorageObjectPage objectPage = googleStorage.list(BUCKET, PATH, null, null); + + assertEquals(objectPage.getObjectList().get(0), objectMetadata1); + assertEquals(objectPage.getObjectList().get(1), objectMetadata2); + assertEquals(objectPage.getNextPageToken(), nextPageToken); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java index 9bfe2706f803..a0f17c97d910 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java @@ -22,8 +22,6 @@ import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpResponseException; import com.google.api.client.http.InputStreamContent; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -146,7 +144,7 @@ public void testStreamTaskLogWithoutOffset() throws Exception final String logPath = PREFIX + "/" + TASKID; EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true); EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length()); - EasyMock.expect(storage.get(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog))); + EasyMock.expect(storage.getInputStream(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog))); replayAll(); @@ -168,7 +166,7 @@ public void testStreamTaskLogWithPositiveOffset() throws Exception final String logPath = PREFIX + "/" + TASKID; EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true); EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length()); - EasyMock.expect(storage.get(BUCKET, logPath, offset)) + EasyMock.expect(storage.getInputStream(BUCKET, logPath, offset)) .andReturn(new ByteArrayInputStream(StringUtils.toUtf8(expectedLog))); replayAll(); @@ -192,7 +190,7 @@ public void testStreamTaskLogWithNegative() throws Exception final String logPath = PREFIX + "/" + TASKID; EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true); EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length()); - EasyMock.expect(storage.get(BUCKET, logPath, internalOffset)) + EasyMock.expect(storage.getInputStream(BUCKET, logPath, internalOffset)) .andReturn(new ByteArrayInputStream(StringUtils.toUtf8(expectedLog))); replayAll(); @@ -214,7 +212,7 @@ public void testStreamTaskStatus() throws Exception final String logPath = PREFIX + "/" + TASKID + ".status.json"; EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true); EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) taskStatus.length()); - EasyMock.expect(storage.get(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(taskStatus))); + EasyMock.expect(storage.getInputStream(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(taskStatus))); replayAll(); @@ -230,18 +228,11 @@ public void testStreamTaskStatus() throws Exception @Test public void test_killAll_noException_deletesAllTaskLogs() throws IOException { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1); EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); - Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1, object2) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1, object2)); GoogleTestUtils.expectDeleteObjects( storage, @@ -250,29 +241,22 @@ public void test_killAll_noException_deletesAllTaskLogs() throws IOException ); EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier); + EasyMock.replay(inputDataConfig, storage, timeSupplier); googleTaskLogs.killAll(); - EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier); + EasyMock.verify(inputDataConfig, storage, timeSupplier); } @Test public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); - Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1)); GoogleTestUtils.expectDeleteObjects( storage, @@ -282,31 +266,23 @@ public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskL EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier); + EasyMock.replay(inputDataConfig, storage, timeSupplier); googleTaskLogs.killAll(); - EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier); + EasyMock.verify(inputDataConfig, storage, timeSupplier); } @Test public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() { boolean ioExceptionThrown = false; - Storage.Objects.List listRequest = null; try { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); - listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1)); GoogleTestUtils.expectDeleteObjects( storage, @@ -316,7 +292,7 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteA EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier); + EasyMock.replay(inputDataConfig, storage, timeSupplier); googleTaskLogs.killAll(); } @@ -326,23 +302,16 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteA Assert.assertTrue(ioExceptionThrown); - EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier); + EasyMock.verify(inputDataConfig, storage, timeSupplier); } @Test public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws IOException { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_FUTURE); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_FUTURE); - Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1, object2) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1, object2)); GoogleTestUtils.expectDeleteObjects( storage, @@ -351,25 +320,18 @@ public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws ); EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, inputDataConfig, storage); + EasyMock.replay(inputDataConfig, storage); googleTaskLogs.killOlderThan(TIME_NOW); - EasyMock.verify(listRequest, inputDataConfig, storage); + EasyMock.verify(inputDataConfig, storage); } @Test public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAllTaskLogs() throws IOException { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - - Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1)); GoogleTestUtils.expectDeleteObjects( storage, @@ -379,29 +341,21 @@ public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAll EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, inputDataConfig, storage); + EasyMock.replay(inputDataConfig, storage); googleTaskLogs.killOlderThan(TIME_NOW); - EasyMock.verify(listRequest, inputDataConfig, storage); + EasyMock.verify(inputDataConfig, storage); } @Test public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() { boolean ioExceptionThrown = false; - Storage.Objects.List listRequest = null; try { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - - listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1)); GoogleTestUtils.expectDeleteObjects( storage, @@ -411,7 +365,7 @@ public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntD EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, inputDataConfig, storage); + EasyMock.replay(inputDataConfig, storage); googleTaskLogs.killOlderThan(TIME_NOW); } @@ -421,6 +375,6 @@ public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntD Assert.assertTrue(ioExceptionThrown); - EasyMock.verify(listRequest, inputDataConfig, storage); + EasyMock.verify(inputDataConfig, storage); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java index 219d96c21662..c68911448e26 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java @@ -19,21 +19,17 @@ package org.apache.druid.storage.google; -import com.google.api.client.util.DateTime; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; import org.apache.commons.io.IOUtils; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.easymock.IExpectationSetters; +import org.joda.time.DateTime; import java.io.IOException; import java.io.InputStream; import java.io.StringWriter; -import java.math.BigInteger; import java.net.URI; import java.util.HashMap; import java.util.List; @@ -41,79 +37,60 @@ public class GoogleTestUtils extends EasyMockSupport { - private static final org.joda.time.DateTime NOW = DateTimes.nowUtc(); + private static final DateTime NOW = DateTimes.nowUtc(); private static final byte[] CONTENT = StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis())); - public static StorageObject newStorageObject( + public static GoogleStorageObjectMetadata newStorageObject( String bucket, String key, long lastModifiedTimestamp ) { - StorageObject object = new StorageObject(); - object.setBucket(bucket); - object.setName(key); - object.setUpdated(new DateTime(lastModifiedTimestamp)); - object.setEtag("etag"); - object.setSize(BigInteger.valueOf(CONTENT.length)); + GoogleStorageObjectMetadata object = new GoogleStorageObjectMetadata(bucket, key, (long) CONTENT.length, + lastModifiedTimestamp + ); return object; } - public static Storage.Objects.List expectListRequest( + public static void expectListObjectsPageRequest( GoogleStorage storage, - URI prefix - ) throws IOException - { - Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class); - String bucket = prefix.getAuthority(); - EasyMock.expect( - storage.list(bucket) - ).andReturn(listRequest).once(); - return listRequest; - } - - public static void expectListObjects( - Storage.Objects.List listRequest, URI prefix, long maxListingLength, - List objects + List objectMetadataList ) throws IOException { - EasyMock.expect(listRequest.setPrefix(StringUtils.maybeRemoveLeadingSlash(prefix.getPath()))).andReturn(listRequest); - EasyMock.expect(listRequest.setMaxResults(maxListingLength)).andReturn(listRequest); - EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).anyTimes(); - - Objects resultObjects = new Objects(); - resultObjects.setItems(objects); - - EasyMock.expect( - listRequest.execute() - ).andReturn(resultObjects).once(); + GoogleStorageObjectPage objectMetadataPage = new GoogleStorageObjectPage(objectMetadataList, null); + String bucket = prefix.getAuthority(); + EasyMock.expect(storage.list(bucket, StringUtils.maybeRemoveLeadingSlash(prefix.getPath()), maxListingLength, null)) + .andReturn(objectMetadataPage) + .once(); } public static void expectDeleteObjects( GoogleStorage storage, - List deleteObjectExpected, - Map deleteObjectToException + List deleteObjectExpected, + Map deleteObjectToException ) throws IOException { - Map> requestToResultExpectationSetter = new HashMap<>(); - for (Map.Entry deleteObjectAndException : deleteObjectToException.entrySet()) { - StorageObject deleteObject = deleteObjectAndException.getKey(); + Map> requestToResultExpectationSetter = new HashMap<>(); + for (Map.Entry deleteObjectAndException : deleteObjectToException.entrySet()) { + GoogleStorageObjectMetadata deleteObject = deleteObjectAndException.getKey(); Exception exception = deleteObjectAndException.getValue(); - IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject); + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get( + deleteObject); if (resultExpectationSetter == null) { storage.delete(deleteObject.getBucket(), deleteObject.getName()); - resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception); + resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception); requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter); } else { resultExpectationSetter.andThrow(exception); } } - for (StorageObject deleteObject : deleteObjectExpected) { - IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject); + for (GoogleStorageObjectMetadata deleteObject : deleteObjectExpected) { + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get( + deleteObject); if (resultExpectationSetter == null) { storage.delete(deleteObject.getBucket(), deleteObject.getName()); resultExpectationSetter = EasyMock.expectLastCall(); diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java index 408033db053e..b9417b7f7f0e 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java @@ -19,8 +19,6 @@ package org.apache.druid.storage.google; -import com.google.api.client.util.DateTime; -import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.ImmutableList; import org.apache.druid.java.util.common.StringUtils; import org.junit.Assert; @@ -38,14 +36,14 @@ public void getLatestVersion() String keyPrefix = "prefix/dir/0"; // object for directory prefix/dir/0/ - final StorageObject storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "//", 0); - storageObject1.setUpdated(new DateTime(System.currentTimeMillis())); - final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v1", 1); - storageObject2.setUpdated(new DateTime(System.currentTimeMillis())); - final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v2", 1); - storageObject3.setUpdated(new DateTime(System.currentTimeMillis() + 100)); - final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/other", 4); - storageObject4.setUpdated(new DateTime(System.currentTimeMillis() + 100)); + final GoogleStorageObjectMetadata storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "//", 0); + storageObject1.setLastUpdateTime(System.currentTimeMillis()); + final GoogleStorageObjectMetadata storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v1", 1); + storageObject2.setLastUpdateTime(System.currentTimeMillis()); + final GoogleStorageObjectMetadata storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v2", 1); + storageObject3.setLastUpdateTime(System.currentTimeMillis() + 100); + final GoogleStorageObjectMetadata storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/other", 4); + storageObject4.setLastUpdateTime(System.currentTimeMillis() + 100); final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4)); final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage); @@ -62,14 +60,14 @@ public void getLatestVersionTrailingSlashKeyPrefix() String keyPrefix = "prefix/dir/0/"; // object for directory prefix/dir/0/ - final StorageObject storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/", 0); - storageObject1.setUpdated(new DateTime(System.currentTimeMillis())); - final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v1", 1); - storageObject2.setUpdated(new DateTime(System.currentTimeMillis())); - final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v2", 1); - storageObject3.setUpdated(new DateTime(System.currentTimeMillis() + 100)); - final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "other", 4); - storageObject4.setUpdated(new DateTime(System.currentTimeMillis() + 100)); + final GoogleStorageObjectMetadata storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/", 0); + storageObject1.setLastUpdateTime(System.currentTimeMillis()); + final GoogleStorageObjectMetadata storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v1", 1); + storageObject2.setLastUpdateTime(System.currentTimeMillis()); + final GoogleStorageObjectMetadata storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v2", 1); + storageObject3.setLastUpdateTime(System.currentTimeMillis() + 100); + final GoogleStorageObjectMetadata storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "other", 4); + storageObject4.setLastUpdateTime(System.currentTimeMillis() + 100); final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4)); final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage); diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java index a0b2bd8f2973..a1f227ab5d48 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java @@ -19,19 +19,11 @@ package org.apache.druid.storage.google; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import org.apache.druid.storage.google.ObjectStorageIteratorTest.MockStorage.MockObjects.MockList; -import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; -import java.math.BigInteger; import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -39,7 +31,7 @@ public class ObjectStorageIteratorTest { - private static final ImmutableList TEST_OBJECTS = + private static final ImmutableList TEST_OBJECTS = ImmutableList.of( makeStorageObject("b", "foo", 10L), makeStorageObject("b", "foo/", 0L), // directory @@ -163,11 +155,11 @@ private static void test( final int maxListingLength ) { - final List expectedObjects = new ArrayList<>(); + final List expectedObjects = new ArrayList<>(); // O(N^2) but who cares -- the list is short. for (final String uri : expectedUris) { - final List matches = TEST_OBJECTS + final List matches = TEST_OBJECTS .stream() .filter(storageObject -> GoogleUtils.objectToUri(storageObject).toString().equals(uri)) .collect(Collectors.toList()); @@ -175,7 +167,7 @@ private static void test( expectedObjects.add(Iterables.getOnlyElement(matches)); } - final List actualObjects = ImmutableList.copyOf( + final List actualObjects = ImmutableList.copyOf( GoogleUtils.lazyFetchingStorageObjectsIterator( makeMockClient(TEST_OBJECTS), prefixes.stream().map(URI::create).iterator(), @@ -194,70 +186,33 @@ private static void test( * Makes a mock Google Storage client that handles enough of "List" to test the functionality of the * {@link ObjectStorageIterator} class. */ - static GoogleStorage makeMockClient(final List storageObjects) + static GoogleStorage makeMockClient(final List storageObjects) { return new GoogleStorage(null) { @Override - public Storage.Objects.List list(final String bucket) + public GoogleStorageObjectPage list( + final String bucket, + final String prefix, + final Long pageSize, + final String pageToken + ) { - return mockList(bucket, storageObjects); - } - }; - } - - @SuppressWarnings("UnnecessaryFullyQualifiedName") - static class MockStorage extends Storage - { - private MockStorage() - { - super( - EasyMock.niceMock(HttpTransport.class), - EasyMock.niceMock(JsonFactory.class), - EasyMock.niceMock(HttpRequestInitializer.class) - ); - } - - private MockList mockList(String bucket, java.util.List storageObjects) - { - return new MockObjects().mockList(bucket, storageObjects); - } - - class MockObjects extends Storage.Objects - { - private MockList mockList(String bucket, java.util.List storageObjects) - { - return new MockList(bucket, storageObjects); - } - - class MockList extends Objects.List - { - private final java.util.List storageObjects; - - private MockList(String bucket, java.util.List storageObjects) - { - super(bucket); - this.storageObjects = storageObjects; - } - - @Override - public com.google.api.services.storage.model.Objects execute() { // Continuation token is an index in the "objects" list. - final String continuationToken = getPageToken(); - final int startIndex = continuationToken == null ? 0 : Integer.parseInt(continuationToken); + final int startIndex = pageToken == null ? 0 : Integer.parseInt(pageToken); // Find matching objects. - java.util.List objects = new ArrayList<>(); + List objects = new ArrayList<>(); int nextIndex = -1; for (int i = startIndex; i < storageObjects.size(); i++) { - final StorageObject storageObject = storageObjects.get(i); + final GoogleStorageObjectMetadata storageObject = storageObjects.get(i); - if (storageObject.getBucket().equals(getBucket()) - && storageObject.getName().startsWith(getPrefix())) { + if (storageObject.getBucket().equals(bucket) + && storageObject.getName().startsWith(prefix)) { - if (objects.size() == getMaxResults()) { + if (objects.size() == pageSize) { // We reached our max key limit; set nextIndex (which will lead to a result with truncated = true). nextIndex = i; break; @@ -268,30 +223,18 @@ public com.google.api.services.storage.model.Objects execute() } } - com.google.api.services.storage.model.Objects retVal = new com.google.api.services.storage.model.Objects(); - retVal.setItems(objects); - if (nextIndex >= 0) { - retVal.setNextPageToken(String.valueOf(nextIndex)); - } else { - retVal.setNextPageToken(null); - } + GoogleStorageObjectPage retVal = new GoogleStorageObjectPage( + objects, + nextIndex >= 0 ? String.valueOf(nextIndex) : null + ); return retVal; } } - } - } - - private static MockList mockList(String bucket, List storageObjects) - { - return new MockStorage().mockList(bucket, storageObjects); + }; } - static StorageObject makeStorageObject(final String bucket, final String key, final long size) + static GoogleStorageObjectMetadata makeStorageObject(final String bucket, final String key, final long size) { - final StorageObject summary = new StorageObject(); - summary.setBucket(bucket); - summary.setName(key); - summary.setSize(BigInteger.valueOf(size)); - return summary; + return new GoogleStorageObjectMetadata(bucket, key, size, null); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleInputRangeTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleInputRangeTest.java new file mode 100644 index 000000000000..1c3dabcf984e --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleInputRangeTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +public class GoogleInputRangeTest +{ + @Test + public void testEquals() + { + EqualsVerifier.forClass(GoogleInputRange.class) + .usingGetClass() + .verify(); + } +} diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleOutputConfigTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleOutputConfigTest.java new file mode 100644 index 000000000000..59081d96149b --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleOutputConfigTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + + +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class GoogleOutputConfigTest +{ + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final String BUCKET = "bucket"; + private static final String PREFIX = "prefix"; + private static final int MAX_RETRY_COUNT = 0; + + @Test + public void testTooLargeChunkSize() + { + HumanReadableBytes chunkSize = new HumanReadableBytes("17MiB"); + Assert.assertThrows( + DruidException.class, + () -> new GoogleOutputConfig(BUCKET, PREFIX, temporaryFolder.newFolder(), chunkSize, MAX_RETRY_COUNT) + ); + } +} diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java new file mode 100644 index 000000000000..df6c66e84c3f --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.ProvisionException; +import com.google.inject.name.Names; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.StartupInjectorBuilder; +import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorModule; +import org.apache.druid.storage.StorageConnectorProvider; +import org.apache.druid.storage.google.GoogleInputDataConfig; +import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageDruidModule; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.Properties; + +public class GoogleStorageConnectorProviderTest +{ + private static final String CUSTOM_NAMESPACE = "custom"; + + @Test + public void createGoogleStorageFactoryWithRequiredProperties() + { + + final Properties properties = new Properties(); + properties.setProperty(CUSTOM_NAMESPACE + ".type", "google"); + properties.setProperty(CUSTOM_NAMESPACE + ".bucket", "bucket"); + properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); + properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp"); + StorageConnectorProvider googleStorageConnectorProvider = getStorageConnectorProvider(properties); + + Assert.assertTrue(googleStorageConnectorProvider instanceof GoogleStorageConnectorProvider); + Assert.assertTrue(googleStorageConnectorProvider.get() instanceof GoogleStorageConnector); + Assert.assertEquals("bucket", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getBucket()); + Assert.assertEquals("prefix", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getPrefix()); + Assert.assertEquals(new File("/tmp"), ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getTempDir()); + + } + + @Test + public void createGoogleStorageFactoryWithMissingPrefix() + { + + final Properties properties = new Properties(); + properties.setProperty(CUSTOM_NAMESPACE + ".type", "bucket"); + properties.setProperty(CUSTOM_NAMESPACE + ".bucket", "bucket"); + properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp"); + Assert.assertThrows( + "Missing required creator property 'prefix'", + ProvisionException.class, + () -> getStorageConnectorProvider(properties) + ); + } + + + @Test + public void createGoogleStorageFactoryWithMissingbucket() + { + + final Properties properties = new Properties(); + properties.setProperty(CUSTOM_NAMESPACE + ".type", "Google"); + properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); + properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp"); + Assert.assertThrows( + "Missing required creator property 'bucket'", + ProvisionException.class, + () -> getStorageConnectorProvider(properties) + ); + } + + @Test + public void createGoogleStorageFactoryWithMissingTempDir() + { + + final Properties properties = new Properties(); + properties.setProperty(CUSTOM_NAMESPACE + ".type", "Google"); + properties.setProperty(CUSTOM_NAMESPACE + ".bucket", "bucket"); + properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); + + Assert.assertThrows( + "Missing required creator property 'tempDir'", + ProvisionException.class, + () -> getStorageConnectorProvider(properties) + ); + } + + private StorageConnectorProvider getStorageConnectorProvider(Properties properties) + { + StartupInjectorBuilder startupInjectorBuilder = new StartupInjectorBuilder().add( + new GoogleStorageDruidModule(), + new StorageConnectorModule(), + new GoogleStorageConnectorModule(), + binder -> { + JsonConfigProvider.bind( + binder, + CUSTOM_NAMESPACE, + StorageConnectorProvider.class, + Names.named(CUSTOM_NAMESPACE) + ); + + binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE))) + .toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE))) + .in(LazySingleton.class); + } + ).withProperties(properties); + + Injector injector = startupInjectorBuilder.build(); + injector.getInstance(ObjectMapper.class).registerModules(new GoogleStorageConnectorModule().getJacksonModules()); + injector.getInstance(ObjectMapper.class).setInjectableValues( + new InjectableValues.Std() + .addValue( + GoogleStorage.class, + EasyMock.mock(GoogleStorage.class) + ).addValue( + GoogleInputDataConfig.class, + EasyMock.mock(GoogleInputDataConfig.class) + )); + + + return injector.getInstance(Key.get( + StorageConnectorProvider.class, + Names.named(CUSTOM_NAMESPACE) + )); + } +} diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorTest.java new file mode 100644 index 000000000000..7a5e6ba107b8 --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.storage.google.GoogleInputDataConfig; +import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageObjectMetadata; +import org.apache.druid.storage.google.GoogleStorageObjectPage; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class GoogleStorageConnectorTest +{ + private static final String BUCKET = "BUCKET"; + private static final String PREFIX = "PREFIX"; + private static final String TEST_FILE = "TEST_FILE"; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final int MAX_LISTING_LEN = 10; + + private static final HumanReadableBytes CHUNK_SIZE = new HumanReadableBytes("4MiB"); + + GoogleStorageConnector googleStorageConnector; + private final GoogleStorage googleStorage = EasyMock.createMock(GoogleStorage.class); + + @Before + public void setUp() throws IOException + { + GoogleOutputConfig config = new GoogleOutputConfig(BUCKET, PREFIX, temporaryFolder.newFolder(), CHUNK_SIZE, null); + GoogleInputDataConfig inputDataConfig = new GoogleInputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_LISTING_LEN); + googleStorageConnector = new GoogleStorageConnector(config, googleStorage, inputDataConfig); + } + + @Test + public void testPathExistsSuccess() + { + final Capture bucket = Capture.newInstance(); + final Capture path = Capture.newInstance(); + EasyMock.expect(googleStorage.exists(EasyMock.capture(bucket), EasyMock.capture(path))).andReturn(true); + EasyMock.replay(googleStorage); + Assert.assertTrue(googleStorageConnector.pathExists(TEST_FILE)); + Assert.assertEquals(BUCKET, bucket.getValue()); + Assert.assertEquals(PREFIX + "/" + TEST_FILE, path.getValue()); + EasyMock.verify(googleStorage); + } + + @Test + public void testPathExistsFailure() + { + final Capture bucket = Capture.newInstance(); + final Capture path = Capture.newInstance(); + EasyMock.expect(googleStorage.exists(EasyMock.capture(bucket), EasyMock.capture(path))).andReturn(false); + EasyMock.replay(googleStorage); + Assert.assertFalse(googleStorageConnector.pathExists(TEST_FILE)); + Assert.assertEquals(BUCKET, bucket.getValue()); + Assert.assertEquals(PREFIX + "/" + TEST_FILE, path.getValue()); + EasyMock.verify(googleStorage); + } + + @Test + public void testDeleteFile() throws IOException + { + Capture bucketCapture = EasyMock.newCapture(); + Capture pathCapture = EasyMock.newCapture(); + googleStorage.delete( + EasyMock.capture(bucketCapture), + EasyMock.capture(pathCapture) + ); + + EasyMock.replay(googleStorage); + googleStorageConnector.deleteFile(TEST_FILE); + Assert.assertEquals(BUCKET, bucketCapture.getValue()); + Assert.assertEquals(PREFIX + "/" + TEST_FILE, pathCapture.getValue()); + } + + @Test + public void testDeleteFiles() throws IOException + { + Capture containerCapture = EasyMock.newCapture(); + Capture> pathsCapture = EasyMock.newCapture(); + googleStorage.batchDelete(EasyMock.capture(containerCapture), EasyMock.capture(pathsCapture)); + EasyMock.replay(googleStorage); + googleStorageConnector.deleteFiles(ImmutableList.of(TEST_FILE + "_1.part", TEST_FILE + "_2.json")); + Assert.assertEquals(BUCKET, containerCapture.getValue()); + Assert.assertEquals( + ImmutableList.of( + PREFIX + "/" + TEST_FILE + "_1.part", + PREFIX + "/" + TEST_FILE + "_2.json" + ), + Lists.newArrayList(pathsCapture.getValue()) + ); + EasyMock.reset(googleStorage); + } + + @Test + public void testListDir() throws IOException + { + GoogleStorageObjectMetadata objectMetadata1 = new GoogleStorageObjectMetadata( + BUCKET, + PREFIX + "/x/y" + TEST_FILE, + (long) 3, + null + ); + GoogleStorageObjectMetadata objectMetadata2 = new GoogleStorageObjectMetadata( + BUCKET, + PREFIX + "/p/q/r/" + TEST_FILE, + (long) 4, + null + ); + Capture maxListingCapture = EasyMock.newCapture(); + Capture pageTokenCapture = EasyMock.newCapture(); + EasyMock.expect(googleStorage.list( + EasyMock.anyString(), + EasyMock.anyString(), + EasyMock.capture(maxListingCapture), + EasyMock.capture(pageTokenCapture) + )) + .andReturn(new GoogleStorageObjectPage(ImmutableList.of(objectMetadata1, objectMetadata2), null)); + EasyMock.replay(googleStorage); + List ret = Lists.newArrayList(googleStorageConnector.listDir("")); + Assert.assertEquals(ImmutableList.of("x/y" + TEST_FILE, "p/q/r/" + TEST_FILE), ret); + Assert.assertEquals(MAX_LISTING_LEN, maxListingCapture.getValue().intValue()); + Assert.assertEquals(null, pageTokenCapture.getValue()); + + } + + @Test + public void testRead() throws IOException + { + String data = "test"; + EasyMock.expect(googleStorage.size(EasyMock.anyString(), EasyMock.anyString())) + .andReturn(4L); + EasyMock.expect( + googleStorage.getInputStream( + EasyMock.anyString(), + EasyMock.anyString(), + EasyMock.anyLong(), + EasyMock.anyLong() + ) + ).andReturn(IOUtils.toInputStream(data, StandardCharsets.UTF_8)); + + EasyMock.replay(googleStorage); + InputStream is = googleStorageConnector.read(TEST_FILE); + byte[] dataBytes = new byte[data.length()]; + Assert.assertEquals(data.length(), is.read(dataBytes)); + Assert.assertEquals(-1, is.read()); + Assert.assertEquals(data, new String(dataBytes, StandardCharsets.UTF_8)); + } + + @Test + public void testReadRange() throws IOException + { + String data = "test"; + + for (int start = 0; start < data.length(); ++start) { + for (long length = 1; length <= data.length() - start; ++length) { + String dataQueried = data.substring(start, start + ((Long) length).intValue()); + EasyMock.expect(googleStorage.getInputStream( + EasyMock.anyString(), + EasyMock.anyString(), + EasyMock.anyLong(), + EasyMock.anyLong() + )) + .andReturn(IOUtils.toInputStream(dataQueried, StandardCharsets.UTF_8)); + EasyMock.replay(googleStorage); + + InputStream is = googleStorageConnector.readRange(TEST_FILE, start, length); + byte[] dataBytes = new byte[((Long) length).intValue()]; + Assert.assertEquals(length, is.read(dataBytes)); + Assert.assertEquals(-1, is.read()); + Assert.assertEquals(dataQueried, new String(dataBytes, StandardCharsets.UTF_8)); + EasyMock.reset(googleStorage); + } + } + + } +} diff --git a/integration-tests-ex/cases/pom.xml b/integration-tests-ex/cases/pom.xml index a2b5e596671a..4eb51f9b75d1 100644 --- a/integration-tests-ex/cases/pom.xml +++ b/integration-tests-ex/cases/pom.xml @@ -145,23 +145,6 @@ aws-java-sdk-core ${aws.sdk.version} - - com.google.api-client - google-api-client - ${com.google.apis.client.version} - provided - - - com.google.apis - google-api-services-storage - ${com.google.apis.storage.version} - - - com.google.api-client - google-api-client - - - com.microsoft.azure azure-storage @@ -316,6 +299,18 @@ curator-client 5.5.0 + + com.google.cloud + google-cloud-storage + 2.29.1 + provided + + + com.google.api-client + google-api-client + 2.2.0 + provided + diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java index f1823bdcf87d..9c44d5b14392 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java @@ -25,7 +25,7 @@ import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.services.storage.Storage; +import com.google.cloud.storage.StorageOptions; import com.google.common.base.Predicates; import com.google.common.base.Suppliers; import org.apache.druid.java.util.common.logger.Logger; @@ -84,10 +84,7 @@ private GoogleStorage googleStorageClient() throws GeneralSecurityException, IOE GoogleCredential finalCredential = credential; return new GoogleStorage( Suppliers.memoize( - () -> new Storage - .Builder(httpTransport, jsonFactory, finalCredential) - .setApplicationName("GcsTestUtil") - .build() + () -> StorageOptions.getDefaultInstance().getService() ) ); } diff --git a/licenses.yaml b/licenses.yaml index 0eea187bf6e1..4367878cbd07 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3883,40 +3883,209 @@ name: Google Cloud Storage JSON API license_category: binary module: extensions/druid-google-extensions license_name: Apache License version 2.0 -version: v1-rev20230301-2.0.0 +version: v1-rev20231028-2.0.0 libraries: - com.google.apis: google-api-services-storage --- -name: Google Compute Engine API +name: Google APIs Client Library For Java license_category: binary -module: extensions/gce-extensions +module: java-core license_name: Apache License version 2.0 -version: v1-rev20230606-2.0.0 +version: 2.2.0 libraries: - - com.google.apis: google-api-services-compute + - com.google.api-client: google-api-client --- -name: Google APIs Client Library For Java +name: Google Storage Client Library For Java license_category: binary -module: java-core +module: extensions/druid-google-extensions +license_name: Apache-2.0 +version: 2.29.1 +libraries: + - com.google.cloud: google-cloud-storage + +--- + +name: Google Cloud Storage API +license_category: binary +module: extensions/druid-google-extensions +license_name: BSD-3-Clause License +version: 2.20.0 +libraries: + - com.google.cloud: google-cloud-storage + - com.google.api: api-common + +--- + +name: gax +license_category: binary +module: extensions/druid-google-extensions +license_name: BSD-3-Clause License +version: 2.37.0 +libraries: + - com.google.api: gax + - com.google.api: gax-grpc + - com.google.api: gax-httpjson + +--- + +name: grpc-api +license_category: binary +module: extensions/druid-google-extensions license_name: Apache License version 2.0 -version: 2.2.0 +version: 2.29.1-alpha libraries: - - com.google.api-client: google-api-client + - com.google.api.grpc: gapic-google-cloud-storage-v2 + - com.google.api.grpc: grpc-google-cloud-storage-v2 + - com.google.api.grpc: proto-google-cloud-storage-v2 --- -name: Google HTTP Client Library For Java +name: grpc-io license_category: binary -module: java-core +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 1.59.0 +libraries: + - io.grpc: grpc-alts + - io.grpc: grpc-api + - io.grpc: grpc-auth + - io.grpc: grpc-context + - io.grpc: grpc-core + - io.grpc: grpc-grpclib + - io.grpc: grpc-inprocess + - io.grpc: grpc-protobuf + - io.grpc: grpc-protobuf-lite + - io.grpc: grpc-stub + +--- + +name: proto-google-common-protos +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 2.28.0 +libraries: + - com.google.api.grpc: proto-google-common-protos + +--- + +name: proto-google-iam-v1 +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 1.23.0 +libraries: + - com.google.api.grpc: proto-google-iam-v1 + +--- + +name: google-auth +license_category: binary +module: extensions/druid-google-extensions +license_name: BSD-3-Clause License +version: 1.20.0 +libraries: + - com.google.auth: google-auth-library-credentials + - com.google.auth: google-auth-library-oauth2-http + +--- + +name: google-auto-value +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 1.10.4 +libraries: + - com.google.auto.value: auto-value-annotations + +--- + +name: google-cloud +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 2.27.0 +libraries: + - com.google.cloud: google-cloud-core + - com.google.cloud: google-cloud-core-grpc + - com.google.cloud: google-cloud-core-http + +--- + +name: listenablefuture +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 9999.0-empty-to-avoid-conflict-with-guava +libraries: + - com.google.guava: listenablefuture + +--- + +name: google-http-client +license_category: binary +module: extensions/druid-google-extensions license_name: Apache License version 2.0 -version: 1.42.3 +version: 1.43.3 +libraries: + - com.google.http-client: google-http-client-apache-v2 + - com.google.http-client: google-http-client-appengine + - com.google.http-client: google-http-client-gson + +--- + +name: google-protobuf +license_category: binary +module: extensions/druid-google-extensions +license_name: BSD-3-Clause License +version: 3.24.4 +libraries: + - com.google.protobuf: protobuf-java-util + +--- + +name: google-grpc +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 1.59.0 +libraries: + - io.grpc: grpc-grpclb + +--- + +name: io-opencensus +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 0.31.1 +libraries: + - io.opencensus: opencensus-api + - io.opencensus: opencensus-contrib-http-util + +--- + +name: conscrypt-openjdk-uber +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 2.5.2 +libraries: + - org.conscrypt: conscrypt-openjdk-uber + +--- + +name: threetenbp +license_category: binary +module: extensions/druid-google-extensions +license_name: BSD-3-Clause License +version: 1.6.8 libraries: - - com.google.http-client: google-http-client - - com.google.http-client: google-http-client-jackson2 + - org.threeten: threetenbp --- diff --git a/pom.xml b/pom.xml index 8c3b26fbaf43..59d8bacb6396 100644 --- a/pom.xml +++ b/pom.xml @@ -129,7 +129,7 @@ 2.2.0 1.42.3 v1-rev20230606-2.0.0 - v1-rev20230301-2.0.0 + 2.29.1 maven.org