Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MSQ Durable Storage Connector for Google Cloud Storage and change current Google Cloud Storage client library #15398

Merged
merged 30 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f40e285
Initial commit with the new GCS library
gargvishesh Nov 16, 2023
b4e719a
Changes for GCS Storage Connector
gargvishesh Nov 17, 2023
09fef50
Remove exception
gargvishesh Nov 17, 2023
10bad2f
Add prefix to paths
gargvishesh Nov 20, 2023
9211a69
Fixes
gargvishesh Nov 20, 2023
9a6ab03
Working version with fixes for removing try-with-resource and objectP…
gargvishesh Nov 21, 2023
546b1ef
Minor changes
gargvishesh Nov 21, 2023
26be863
Use full path for deleteRecursively and fix tests
gargvishesh Nov 21, 2023
937056b
Refactoring and checkstyle fixes
gargvishesh Nov 21, 2023
a166099
Doc updates
gargvishesh Nov 21, 2023
a84a368
Remove GCS BOM in dependency management
gargvishesh Nov 21, 2023
1e8b38f
Remove comments
gargvishesh Nov 21, 2023
8319d4c
Create temp dir in google storage connector contructor
gargvishesh Nov 30, 2023
dad4289
Address review comments
gargvishesh Dec 4, 2023
b0d15be
Limit internal buffer size of GCS storage reader and writer channels
gargvishesh Dec 4, 2023
f9105a9
Update write buffer size to 2 MB
gargvishesh Dec 6, 2023
c05a9c0
Add GoogleStorageConnectorTest
gargvishesh Dec 7, 2023
93a6df3
Merge branch 'master' into 35053-gcs-durable-storage-connector
gargvishesh Dec 7, 2023
55febef
Address review comments
gargvishesh Dec 7, 2023
f09c461
Remove exception handling
gargvishesh Dec 8, 2023
44e9b10
Fix exceptions and handling
gargvishesh Dec 8, 2023
4454afb
Fix exceptions and handling - part 2
gargvishesh Dec 12, 2023
9f30fd2
Fix exceptions and handling - part 3
gargvishesh Dec 12, 2023
fc51307
Fix build errors and pom
gargvishesh Dec 12, 2023
54a9687
Add licenses and tests
gargvishesh Dec 12, 2023
306d71a
fix license.yaml module name
gargvishesh Dec 13, 2023
3bd3125
Add toString to GoogleInputRange
gargvishesh Dec 13, 2023
cb38ccd
Add GoogleStorageTest
gargvishesh Dec 13, 2023
8294a33
Fix forbidden api error
gargvishesh Dec 13, 2023
9e08c84
Fix forbidden api error - part 2
gargvishesh Dec 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions extensions-core/google-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@
<relativePath>../../pom.xml</relativePath>
</parent>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>${com.google.cloud.libraries.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
Expand All @@ -48,16 +60,10 @@
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to update :

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-storage</artifactId>
<version>${com.google.apis.storage.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
</exclusion>
</exclusions>
</dependency>
as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed now.

<artifactId>google-api-services-storage</artifactId>
<version>${com.google.apis.storage.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
</exclusion>
</exclusions>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.google.GoogleByteSource;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleStorageDruidModule;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleUtils;

import javax.annotation.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.Iterators;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputSplit;
Expand All @@ -35,14 +34,13 @@
import org.apache.druid.data.input.impl.systemfield.SystemFields;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.google.GoogleInputDataConfig;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleStorageDruidModule;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -139,23 +137,23 @@ public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> pre
@Override
public long getObjectSize(CloudObjectLocation location) throws IOException
{
final StorageObject storageObject = storage.getMetadata(location.getBucket(), location.getPath());
final GoogleStorage.GoogleStorageObjectMetadata storageObject = storage.getMetadata(location.getBucket(), location.getPath());
return getSize(storageObject);
}
}

return new SplitWidget();
}

private static long getSize(final StorageObject object)
private static long getSize(final GoogleStorage.GoogleStorageObjectMetadata object)
{
final BigInteger sizeInBigInteger = object.getSize();
final Long sizeInLong = object.getSize();

if (sizeInBigInteger == null) {
if (sizeInLong == null) {
return Long.MAX_VALUE;
} else {
try {
return sizeInBigInteger.longValueExact();
return sizeInLong;
}
catch (ArithmeticException e) {
LOG.warn(
Expand All @@ -164,7 +162,7 @@ private static long getSize(final StorageObject object)
+ "The max long value will be used for its size instead.",
object.getBucket(),
object.getName(),
sizeInBigInteger
sizeInLong
);
return Long.MAX_VALUE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ public String getPath()
@Override
public InputStream openStream() throws IOException
{
return storage.get(bucket, path);
return storage.getInputStream(bucket, path);
}

public InputStream openStream(long start) throws IOException
{
return storage.get(bucket, path, start);
return storage.getInputStream(bucket, path, start);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ FileUtils.FileCopyResult getSegmentFiles(final String bucket, final String path,
public InputStream getInputStream(URI uri) throws IOException
{
String path = StringUtils.maybeRemoveLeadingSlash(uri.getPath());
return storage.get(uri.getHost() != null ? uri.getHost() : uri.getAuthority(), path);
return storage.getInputStream(uri.getHost() != null ? uri.getHost() : uri.getAuthority(), path);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,24 @@
package org.apache.druid.storage.google;

import com.google.api.client.http.AbstractInputStreamContent;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.Storage.Objects.Get;
import com.google.api.services.storage.model.StorageObject;
import com.google.api.gax.paging.Page;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class GoogleStorage
{
Expand All @@ -41,64 +52,197 @@ public class GoogleStorage
*/
private final Supplier<Storage> storage;

public GoogleStorage(Supplier<Storage> storage)
public static class GoogleStorageObjectMetadata
{
final String bucket;
final String name;
final Long size;
Long lastUpdateTime;

public GoogleStorageObjectMetadata(final String bucket, final String name, final Long size, final Long lastUpdateTime)
{
this.bucket = bucket;
this.name = name;
this.size = size;
this.lastUpdateTime = lastUpdateTime;
}

public void setLastUpdateTime(Long lastUpdateTime)
{
this.lastUpdateTime = lastUpdateTime;
}


public String getBucket()
{
return bucket;
}

public String getName()
{
return name;
}

public Long getSize()
{
return size;
}

public Long getLastUpdateTime()
{
return lastUpdateTime;
}
}

public static class GoogleStorageObjectPage
{
final List<GoogleStorage.GoogleStorageObjectMetadata> objectList;

@Nullable
final String nextPageToken;

public GoogleStorageObjectPage(
List<GoogleStorage.GoogleStorageObjectMetadata> objectList,
String nextPageToken
)
{
this.objectList = objectList;
this.nextPageToken = nextPageToken;
}

public List<GoogleStorage.GoogleStorageObjectMetadata> getObjectList()
{
return objectList;
}

@Nullable
public String getNextPageToken()
{
return nextPageToken;
}
}

public GoogleStorage(final Supplier<Storage> storage)
{
this.storage = storage;
}

public void insert(final String bucket, final String path, AbstractInputStreamContent mediaContent) throws IOException
{
Storage.Objects.Insert insertObject = storage.get().objects().insert(bucket, null, mediaContent);
insertObject.setName(path);
insertObject.getMediaHttpUploader().setDirectUploadEnabled(false);
insertObject.execute();
storage.get().createFrom(getBlobInfo(bucket, path), mediaContent.getInputStream());
}

public InputStream get(final String bucket, final String path) throws IOException
public InputStream getInputStream(final String bucket, final String path) throws IOException
{
return get(bucket, path, 0);
return getInputStream(bucket, path, 0, null);
}

public InputStream get(final String bucket, final String path, long start) throws IOException
public InputStream getInputStream(final String bucket, final String path, long start) throws IOException
{
final Get get = storage.get().objects().get(bucket, path);
InputStream inputStream = get.executeMediaAsInputStream();
inputStream.skip(start);
return inputStream;
return getInputStream(bucket, path, start, null);
}

public StorageObject getMetadata(final String bucket, final String path) throws IOException
public InputStream getInputStream(final String bucket, final String path, long start, @Nullable Long length) throws IOException
{
return storage.get().objects().get(bucket, path).execute();
ReadChannel reader = storage.get().reader(bucket, path);
reader.seek(start);
if (length != null) {
reader.limit(start + length);
}
return Channels.newInputStream(reader);
}

public void delete(final String bucket, final String path) throws IOException
public OutputStream getObjectOutputStream(
final String bucket,
final String path
)
{
WriteChannel writer = storage.get().writer(getBlobInfo(bucket, path));
return Channels.newOutputStream(writer);
}

public GoogleStorage.GoogleStorageObjectMetadata getMetadata(
final String bucket,
final String path
)
{
storage.get().objects().delete(bucket, path).execute();
Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.values()));
return new GoogleStorage.GoogleStorageObjectMetadata(
blob.getBucket(),
blob.getName(),
blob.getSize(),
blob.getUpdateTimeOffsetDateTime()
.toEpochSecond()
);
}

public void delete(final String bucket, final String path) throws IOException
{
storage.get().delete(bucket, path);
}
public boolean exists(final String bucket, final String path)
{
try {
return storage.get().objects().get(bucket, path).executeUsingHead().isSuccessStatusCode();
}
catch (Exception e) {
return false;
}

Blob blob = storage.get().get(bucket, path);
return blob != null;
}

public long size(final String bucket, final String path) throws IOException
{
return storage.get().objects().get(bucket, path).execute().getSize().longValue();
Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
return blob.getSize();
}

public String version(final String bucket, final String path) throws IOException
{
return storage.get().objects().get(bucket, path).execute().getEtag();
Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.GENERATION));
return blob.getGeneratedId();
}

public GoogleStorage.GoogleStorageObjectPage list(
final String bucket,
@Nullable final String prefix,
@Nullable final Long pageSize,
@Nullable final String pageToken
) throws IOException
{
List<Storage.BlobListOption> options = new ArrayList<>();

if (prefix != null) {
options.add(Storage.BlobListOption.prefix(prefix));
}

if (pageSize != null) {
options.add(Storage.BlobListOption.pageSize(pageSize));
}

if (pageToken != null) {
options.add(Storage.BlobListOption.pageToken(pageToken));
}

Page<Blob> blobPage = storage.get().list(bucket, options.toArray(new Storage.BlobListOption[0]));

List<GoogleStorage.GoogleStorageObjectMetadata> googleStorageObjectMetadataList = blobPage.streamValues()
.map(blob -> new GoogleStorage.GoogleStorageObjectMetadata(
blob.getBucket(),
blob.getName(),
blob.getSize(),
blob.getUpdateTimeOffsetDateTime()
.toEpochSecond()
))
.collect(Collectors.toList());

return new GoogleStorage.GoogleStorageObjectPage(googleStorageObjectMetadataList, blobPage.getNextPageToken());

}
public void batchDelete(final String bucket, final Iterable<String> paths){
storage.get().delete(Iterables.transform(paths, input -> BlobId.of(bucket, input)));
}

public Storage.Objects.List list(final String bucket) throws IOException
private BlobInfo getBlobInfo(final String bucket, final String path)
{
return storage.get().objects().list(bucket);
BlobId blobId = BlobId.of(bucket, path);
return BlobInfo.newBuilder(blobId).build();

}
}
Loading
Loading