From f2dbb56d9eb8933966a2dfcda22e42a8bee79b11 Mon Sep 17 00:00:00 2001 From: JiamingMai Date: Fri, 12 Apr 2024 01:01:46 +0800 Subject: [PATCH] Add Alluxio file system support --- lib/trino-filesystem-alluxio/pom.xml | 125 ++++++ .../alluxio/AlluxioFileIterator.java | 64 +++ .../filesystem/alluxio/AlluxioFileSystem.java | 374 ++++++++++++++++++ .../alluxio/AlluxioFileSystemFactory.java | 48 +++ .../alluxio/AlluxioFileSystemInput.java | 86 ++++ .../alluxio/AlluxioFileSystemInputFile.java | 162 ++++++++ .../alluxio/AlluxioFileSystemModule.java | 29 ++ .../alluxio/AlluxioFileSystemOutputFile.java | 112 ++++++ .../alluxio/AlluxioTrinoInputStream.java | 137 +++++++ .../alluxio/AlluxioTrinoOutputStream.java | 66 ++++ .../filesystem/alluxio/AlluxioUtils.java | 111 ++++++ .../AbstractTestAlluxioFileSystem.java | 118 ++++++ .../alluxio/TestAlluxioFileSystem.java | 95 +++++ .../filesystem/alluxio/TestAlluxioUtils.java | 58 +++ lib/trino-filesystem-manager/pom.xml | 5 + .../filesystem/manager/FileSystemConfig.java | 13 + .../filesystem/manager/FileSystemModule.java | 7 + .../manager/TestFileSystemConfig.java | 3 + pom.xml | 127 ++++-- 19 files changed, 1712 insertions(+), 28 deletions(-) create mode 100644 lib/trino-filesystem-alluxio/pom.xml create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemModule.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java create mode 100644 lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java create mode 100644 lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioFileSystem.java create mode 100644 lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioUtils.java diff --git a/lib/trino-filesystem-alluxio/pom.xml b/lib/trino-filesystem-alluxio/pom.xml new file mode 100644 index 000000000000..8f6523ebe09a --- /dev/null +++ b/lib/trino-filesystem-alluxio/pom.xml @@ -0,0 +1,125 @@ + + + 4.0.0 + + + io.trino + trino-root + 460-SNAPSHOT + ../../pom.xml + + + trino-filesystem-alluxio + Trino Filesystem - Alluxio + + + ${project.parent.basedir} + true + + + + + com.google.inject + guice + + + + io.trino + trino-filesystem + + + + io.trino + trino-memory-context + + + + io.trino + trino-spi + + + + org.alluxio + alluxio-core-client-fs + + + + org.alluxio + alluxio-core-common + + + + org.alluxio + alluxio-core-transport + + + + com.github.docker-java + docker-java-api + test + + + + io.airlift + junit-extensions + test + + + + io.trino + trino-filesystem + ${project.version} + tests + test + + + + io.trino + trino-spi + test-jar + test + + + + io.trino + trino-testing + test + + + + io.trino + trino-testing-containers + test + + + + org.assertj + assertj-core + test + + + + org.junit.jupiter + junit-jupiter-api + test + + + + org.junit.jupiter + junit-jupiter-engine + test + + + + org.testcontainers + junit-jupiter + test + + + + org.testcontainers + testcontainers + test + + + diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java new file mode 100644 index 000000000000..ba16de33df26 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.client.file.URIStatus; +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.Location; + +import java.io.IOException; +import java.time.Instant; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import static io.trino.filesystem.alluxio.AlluxioUtils.convertToLocation; +import static java.util.Objects.requireNonNull; + +public class AlluxioFileIterator + implements FileIterator +{ + private final Iterator files; + private final String mountRoot; + + public AlluxioFileIterator(List files, String mountRoot) + { + this.files = requireNonNull(files.iterator(), "files is null"); + this.mountRoot = requireNonNull(mountRoot, "mountRoot is null"); + } + + @Override + public boolean hasNext() + throws IOException + { + return files.hasNext(); + } + + @Override + public FileEntry next() + throws IOException + { + if (!hasNext()) { + return null; + } + URIStatus fileStatus = files.next(); + Location location = convertToLocation(fileStatus, mountRoot); + return new FileEntry( + location, + fileStatus.getLength(), + Instant.ofEpochMilli(fileStatus.getLastModificationTimeMs()), + Optional.empty()); + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java new file mode 100644 index 000000000000..371e521baecb --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java @@ -0,0 +1,374 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.AlluxioURI; +import alluxio.client.file.FileSystem; +import alluxio.client.file.URIStatus; +import alluxio.exception.AlluxioException; +import alluxio.exception.FileDoesNotExistException; +import alluxio.exception.runtime.NotFoundRuntimeException; +import alluxio.grpc.CreateDirectoryPOptions; +import alluxio.grpc.DeletePOptions; +import alluxio.grpc.ListStatusPOptions; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoOutputFile; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static io.trino.filesystem.alluxio.AlluxioUtils.convertToAlluxioURI; +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; + +public class AlluxioFileSystem + implements TrinoFileSystem +{ + private final FileSystem alluxioClient; + private final String mountRoot; + private final Location rootLocation; + + public AlluxioFileSystem(FileSystem alluxioClient) + { + this(alluxioClient, "/", Location.of("alluxio:///")); + } + + public AlluxioFileSystem(FileSystem alluxioClient, String mountRoot, Location rootLocation) + { + this.alluxioClient = requireNonNull(alluxioClient, "filesystem is null"); + this.mountRoot = mountRoot; // default alluxio mount root + this.rootLocation = requireNonNull(rootLocation, "rootLocation is null"); + } + + public String getMountRoot() + { + return mountRoot; + } + + @Override + public TrinoInputFile newInputFile(Location location) + { + ensureNotRootLocation(location); + ensureNotEndWithSlash(location); + return new AlluxioFileSystemInputFile(location, null, alluxioClient, mountRoot, Optional.empty()); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length) + { + ensureNotRootLocation(location); + ensureNotEndWithSlash(location); + return new AlluxioFileSystemInputFile(location, length, alluxioClient, mountRoot, Optional.empty()); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + ensureNotRootLocation(location); + ensureNotEndWithSlash(location); + return new AlluxioFileSystemInputFile(location, length, alluxioClient, mountRoot, Optional.of(lastModified)); + } + + @Override + public TrinoOutputFile newOutputFile(Location location) + { + ensureNotRootLocation(location); + ensureNotEndWithSlash(location); + return new AlluxioFileSystemOutputFile(rootLocation, location, alluxioClient, mountRoot); + } + + @Override + public void deleteFile(Location location) + throws IOException + { + ensureNotRootLocation(location); + ensureNotEndWithSlash(location); + try { + alluxioClient.delete(convertToAlluxioURI(location, mountRoot)); + } + catch (FileDoesNotExistException _) { + } + catch (AlluxioException e) { + throw new IOException("Error deleteFile %s".formatted(location), e); + } + } + + @Override + public void deleteDirectory(Location location) + throws IOException + { + try { + AlluxioURI uri = convertToAlluxioURI(location, mountRoot); + URIStatus status = alluxioClient.getStatus(uri); + if (status == null) { + return; + } + if (!status.isFolder()) { + throw new IOException("delete directory cannot be called on a file %s".formatted(location)); + } + DeletePOptions deletePOptions = DeletePOptions.newBuilder().setRecursive(true).build(); + // recursive delete on the root directory must be handled manually + if (location.path().isEmpty() || location.path().equals(mountRoot)) { + for (URIStatus uriStatus : alluxioClient.listStatus(uri)) { + alluxioClient.delete(new AlluxioURI(uriStatus.getPath()), deletePOptions); + } + } + else { + alluxioClient.delete(uri, deletePOptions); + } + } + catch (FileDoesNotExistException | NotFoundRuntimeException e) { + } + catch (AlluxioException e) { + throw new IOException("Error deleteDirectory %s".formatted(location), e); + } + } + + @Override + public void renameFile(Location source, Location target) + throws IOException + { + try { + ensureNotRootLocation(source); + ensureNotEndWithSlash(source); + ensureNotRootLocation(target); + ensureNotEndWithSlash(target); + } + catch (IllegalStateException e) { + throw new IllegalStateException( + "Cannot rename file from %s to %s as one of them is root location".formatted(source, target), e); + } + AlluxioURI sourceUri = convertToAlluxioURI(source, mountRoot); + AlluxioURI targetUri = convertToAlluxioURI(target, mountRoot); + + try { + if (!alluxioClient.exists(sourceUri)) { + throw new IOException( + "Cannot rename file %s to %s as file %s doesn't exist".formatted(source, target, source)); + } + if (alluxioClient.exists(targetUri)) { + throw new IOException( + "Cannot rename file %s to %s as file %s already exists".formatted(source, target, target)); + } + URIStatus status = alluxioClient.getStatus(sourceUri); + if (status.isFolder()) { + throw new IOException( + "Cannot rename file %s to %s as %s is a directory".formatted(source, target, source)); + } + alluxioClient.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot)); + } + catch (AlluxioException e) { + throw new IOException("Error renameFile from %s to %s".formatted(source, target), e); + } + } + + @Override + public FileIterator listFiles(Location location) + throws IOException + { + try { + URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot)); + if (status == null) { + new AlluxioFileIterator(Collections.emptyList(), mountRoot); + } + if (!status.isFolder()) { + throw new IOException("Location is not a directory: %s".formatted(location)); + } + } + catch (NotFoundRuntimeException | AlluxioException e) { + return new AlluxioFileIterator(Collections.emptyList(), mountRoot); + } + + try { + List filesStatus = alluxioClient.listStatus(convertToAlluxioURI(location, mountRoot), + ListStatusPOptions.newBuilder().setRecursive(true).build()); + return new AlluxioFileIterator(filesStatus.stream().filter(status -> !status.isFolder() & status.isCompleted()).toList(), mountRoot); + } + catch (AlluxioException e) { + throw new IOException("Error listFiles %s".formatted(location), e); + } + } + + @Override + public Optional directoryExists(Location location) + throws IOException + { + if (location.path().isEmpty()) { + return Optional.of(true); + } + try { + URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot)); + if (status != null && status.isFolder()) { + return Optional.of(true); + } + return Optional.of(false); + } + catch (FileDoesNotExistException | FileNotFoundException | NotFoundRuntimeException e) { + return Optional.of(false); + } + catch (AlluxioException e) { + throw new IOException("Error directoryExists %s".formatted(location), e); + } + } + + @Override + public void createDirectory(Location location) + throws IOException + { + try { + AlluxioURI locationUri = convertToAlluxioURI(location, mountRoot); + if (alluxioClient.exists(locationUri)) { + URIStatus status = alluxioClient.getStatus(locationUri); + if (!status.isFolder()) { + throw new IOException( + "Cannot create a directory for an existing file location %s".formatted(location)); + } + } + alluxioClient.createDirectory( + locationUri, + CreateDirectoryPOptions.newBuilder() + .setAllowExists(true) + .setRecursive(true) + .build()); + } + catch (AlluxioException e) { + throw new IOException("Error createDirectory %s".formatted(location), e); + } + } + + @Override + public void renameDirectory(Location source, Location target) + throws IOException + { + try { + ensureNotRootLocation(source); + ensureNotRootLocation(target); + } + catch (IllegalStateException e) { + throw new IOException( + "Cannot rename directory from %s to %s as one of them is root location".formatted(source, target), e); + } + try { + if (alluxioClient.exists(convertToAlluxioURI(target, mountRoot))) { + throw new IOException( + "Cannot rename %s to %s as file %s already exists".formatted(source, target, target)); + } + alluxioClient.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot)); + } + catch (AlluxioException e) { + throw new IOException("Error renameDirectory from %s to %s".formatted(source, target), e); + } + } + + @Override + public Set listDirectories(Location location) + throws IOException + { + try { + if (isFile(location)) { + throw new IOException("Cannot list directories for a file %s".formatted(location)); + } + List filesStatus = alluxioClient.listStatus(convertToAlluxioURI(location, mountRoot)); + return filesStatus.stream() + .filter(URIStatus::isFolder) + .map((URIStatus fileStatus) -> AlluxioUtils.convertToLocation(fileStatus, mountRoot)) + .map(loc -> { + if (!loc.toString().endsWith("/")) { + return Location.of(loc + "/"); + } + else { + return loc; + } + }) + .collect(Collectors.toSet()); + } + catch (FileDoesNotExistException | FileNotFoundException | NotFoundRuntimeException e) { + return Collections.emptySet(); + } + catch (AlluxioException e) { + throw new IOException("Error listDirectories %s".formatted(location), e); + } + } + + @Override + public Optional createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix) + throws IOException + { + // allow for absolute or relative temporary prefix + Location temporary; + if (temporaryPrefix.startsWith("/")) { + String prefix = temporaryPrefix; + while (prefix.startsWith("/")) { + prefix = prefix.substring(1); + } + temporary = targetPath.appendPath(prefix); + } + else { + temporary = targetPath.appendPath(temporaryPrefix); + } + + temporary = temporary.appendPath(randomUUID().toString()); + + createDirectory(temporary); + return Optional.of(temporary); + } + + private void ensureNotRootLocation(Location location) + { + String locationPath = location.path(); + while (locationPath.endsWith("/")) { + locationPath = locationPath.substring(0, locationPath.length() - 1); + } + + String rootLocationPath = rootLocation.path(); + while (rootLocationPath.endsWith("/")) { + rootLocationPath = rootLocationPath.substring(0, rootLocationPath.length() - 1); + } + + if (rootLocationPath.equals(locationPath)) { + throw new IllegalStateException("Illegal operation on %s".formatted(location)); + } + } + + private void ensureNotEndWithSlash(Location location) + { + String locationPath = location.path(); + if (locationPath.endsWith("/")) { + throw new IllegalStateException("Illegal operation on %s".formatted(location)); + } + } + + private boolean isFile(Location location) + { + try { + URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot)); + if (status == null) { + return false; + } + return !status.isFolder(); + } + catch (NotFoundRuntimeException | AlluxioException | IOException e) { + return false; + } + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java new file mode 100644 index 000000000000..cc7b6303e061 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.client.file.FileSystem; +import alluxio.client.file.FileSystemContext; +import alluxio.conf.AlluxioConfiguration; +import alluxio.conf.Configuration; +import com.google.inject.Inject; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.spi.security.ConnectorIdentity; + +public class AlluxioFileSystemFactory + implements TrinoFileSystemFactory +{ + private final AlluxioConfiguration conf; + + @Inject + public AlluxioFileSystemFactory() + { + this(Configuration.global()); + } + + public AlluxioFileSystemFactory(AlluxioConfiguration conf) + { + this.conf = conf; + } + + @Override + public TrinoFileSystem create(ConnectorIdentity identity) + { + FileSystemContext fsContext = FileSystemContext.create(conf); + FileSystem alluxioClient = FileSystem.Factory.create(fsContext); + return new AlluxioFileSystem(alluxioClient); + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java new file mode 100644 index 000000000000..44843c5ad587 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.client.file.FileInStream; +import io.trino.filesystem.TrinoInput; +import io.trino.filesystem.TrinoInputFile; + +import java.io.IOException; + +import static java.lang.Math.min; +import static java.util.Objects.checkFromIndexSize; +import static java.util.Objects.requireNonNull; + +public class AlluxioFileSystemInput + implements TrinoInput +{ + private final FileInStream stream; + private final TrinoInputFile inputFile; + + private volatile boolean closed; + + public AlluxioFileSystemInput(FileInStream stream, TrinoInputFile inputFile) + { + this.stream = requireNonNull(stream, "stream is null"); + this.inputFile = requireNonNull(inputFile, "inputFile is null"); + } + + @Override + public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength) + throws IOException + { + ensureOpen(); + checkFromIndexSize(bufferOffset, bufferLength, buffer.length); + if (position + bufferLength > inputFile.length()) { + throw new IOException("readFully position overflow %s. pos %d + buffer length %d > file size %d" + .formatted(inputFile.location(), position, bufferLength, inputFile.length())); + } + stream.positionedRead(position, buffer, bufferOffset, bufferLength); + } + + @Override + public int readTail(byte[] buffer, int bufferOffset, int bufferLength) + throws IOException + { + ensureOpen(); + checkFromIndexSize(bufferOffset, bufferLength, buffer.length); + long fileSize = inputFile.length(); + int readSize = (int) min(fileSize, bufferLength); + readFully(fileSize - readSize, buffer, bufferOffset, readSize); + return readSize; + } + + @Override + public String toString() + { + return inputFile.toString(); + } + + @Override + public void close() + throws IOException + { + closed = true; + stream.close(); + } + + private void ensureOpen() + throws IOException + { + if (closed) { + throw new IOException("Input stream closed: " + this); + } + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java new file mode 100644 index 000000000000..ab9e59b5d836 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java @@ -0,0 +1,162 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.client.file.FileInStream; +import alluxio.client.file.FileSystem; +import alluxio.client.file.URIStatus; +import alluxio.exception.AlluxioException; +import alluxio.exception.FileDoesNotExistException; +import alluxio.exception.runtime.NotFoundRuntimeException; +import alluxio.grpc.OpenFilePOptions; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInput; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Instant; +import java.util.Optional; + +import static io.trino.filesystem.alluxio.AlluxioUtils.convertToAlluxioURI; +import static java.util.Objects.requireNonNull; + +public class AlluxioFileSystemInputFile + implements TrinoInputFile +{ + private final Location location; + private final FileSystem fileSystem; + private final String mountRoot; + + private Optional lastModified; + private Long length; + private URIStatus status; + + public AlluxioFileSystemInputFile(Location location, Long length, FileSystem fileSystem, String mountRoot, Optional lastModified) + { + this.location = requireNonNull(location, "location is null"); + this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); + this.mountRoot = requireNonNull(mountRoot, "mountRoot is null"); + this.length = length; + this.lastModified = requireNonNull(lastModified, "lastModified is null"); + } + + @Override + public TrinoInput newInput() + throws IOException + { + try { + return new AlluxioFileSystemInput(openFile(), this); + } + catch (AlluxioException e) { + throw new IOException("Error newInput() file: %s".formatted(location), e); + } + } + + @Override + public TrinoInputStream newStream() + throws IOException + { + try { + return new AlluxioTrinoInputStream(location, openFile(), getURIStatus()); + } + catch (AlluxioException e) { + throw new IOException("Error newStream() file: %s".formatted(location), e); + } + } + + private FileInStream openFile() + throws IOException, AlluxioException + { + if (!exists()) { + throw new FileNotFoundException("File does not exist: " + location); + } + return fileSystem.openFile(getURIStatus(), OpenFilePOptions.getDefaultInstance()); + } + + private void loadFileStatus() + throws IOException + { + if (status == null) { + URIStatus fileStatus = getURIStatus(); + if (length == null) { + length = fileStatus.getLength(); + } + if (lastModified.isEmpty()) { + lastModified = Optional.of(Instant.ofEpochMilli(fileStatus.getLastModificationTimeMs())); + } + } + } + + private URIStatus getURIStatus() + throws IOException + { + try { + //TODO: create a URIStatus object based on the location field + status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot)); + } + catch (FileDoesNotExistException | NotFoundRuntimeException e) { + throw new FileNotFoundException("File does not exist: %s".formatted(location)); + } + catch (AlluxioException | IOException e) { + throw new IOException("Get status for file %s failed: %s".formatted(location, e.getMessage()), e); + } + return status; + } + + @Override + public long length() + throws IOException + { + if (length == null) { + loadFileStatus(); + } + return requireNonNull(length, "length is null"); + } + + @Override + public Instant lastModified() + throws IOException + { + if (lastModified.isEmpty()) { + loadFileStatus(); + } + return lastModified.orElseThrow(); + } + + @Override + public boolean exists() + throws IOException + { + try { + return fileSystem.exists(convertToAlluxioURI(location, mountRoot)); + } + catch (AlluxioException e) { + throw new IOException("fail to check file existence", e); + } + } + + @Override + public Location location() + { + return location; + } + + @Override + public String toString() + { + return location().toString(); + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemModule.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemModule.java new file mode 100644 index 000000000000..7a61711762c5 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemModule.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import com.google.inject.Binder; +import com.google.inject.Module; + +import static com.google.inject.Scopes.SINGLETON; + +public class AlluxioFileSystemModule + implements Module +{ + @Override + public void configure(Binder binder) + { + binder.bind(AlluxioFileSystemFactory.class).in(SINGLETON); + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java new file mode 100644 index 000000000000..c632fe081d7f --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.client.file.FileOutStream; +import alluxio.client.file.FileSystem; +import alluxio.exception.AlluxioException; +import alluxio.grpc.CreateFilePOptions; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoOutputFile; +import io.trino.memory.context.AggregatedMemoryContext; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.FileAlreadyExistsException; + +import static io.trino.filesystem.alluxio.AlluxioUtils.convertToAlluxioURI; +import static java.util.Objects.requireNonNull; + +public class AlluxioFileSystemOutputFile + implements TrinoOutputFile +{ + private final Location rootLocation; + private final Location location; + private final FileSystem fileSystem; + private final String mountRoot; + + public AlluxioFileSystemOutputFile(Location rootLocation, Location location, FileSystem fileSystem, String mountRoot) + { + this.rootLocation = requireNonNull(rootLocation, "root location is null"); + this.location = requireNonNull(location, "location is null"); + this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); + this.mountRoot = requireNonNull(mountRoot, "mountRoot is null"); + } + + @Override + public void createOrOverwrite(byte[] data) + throws IOException + { + ensureOutputFileNotOutsideOfRoot(location); + try (FileOutStream outStream = fileSystem.createFile( + convertToAlluxioURI(location, mountRoot), + CreateFilePOptions.newBuilder().setOverwrite(true).setRecursive(true).build())) { + outStream.write(data); + } + catch (AlluxioException e) { + throw new IOException("Error createOrOverwrite %s".formatted(location), e); + } + } + + @Override + public OutputStream create(AggregatedMemoryContext memoryContext) + throws IOException + { + throwIfAlreadyExists(); + try { + return new AlluxioTrinoOutputStream( + location, + fileSystem.createFile( + convertToAlluxioURI(location, mountRoot), + CreateFilePOptions.newBuilder().setRecursive(true).build())); + } + catch (AlluxioException e) { + throw new IOException("Error create %s".formatted(location), e); + } + } + + @Override + public Location location() + { + return location; + } + + @Override + public String toString() + { + return location().toString(); + } + + private void ensureOutputFileNotOutsideOfRoot(Location location) + throws IOException + { + String path = AlluxioUtils.simplifyPath(location.path()); + if (rootLocation != null && !path.startsWith(rootLocation.path())) { + throw new IOException("Output file %s outside of root is not allowed".formatted(location)); + } + } + + private void throwIfAlreadyExists() + throws IOException + { + try { + if (fileSystem.exists(convertToAlluxioURI(location, mountRoot))) { + throw new FileAlreadyExistsException("File %s already exists".formatted(location)); + } + } + catch (AlluxioException e) { + throw new IOException("Error create %s".formatted(location), e); + } + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java new file mode 100644 index 000000000000..e23d61c5cf61 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java @@ -0,0 +1,137 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.client.file.FileInStream; +import alluxio.client.file.URIStatus; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInputStream; + +import java.io.IOException; + +import static java.util.Objects.checkFromIndexSize; +import static java.util.Objects.requireNonNull; + +public final class AlluxioTrinoInputStream + extends TrinoInputStream +{ + private final Location location; + private final FileInStream stream; + private final URIStatus fileStatus; + + private boolean closed; + + public AlluxioTrinoInputStream(Location location, FileInStream stream, URIStatus fileStatus) + { + this.location = requireNonNull(location, "location is null"); + this.stream = requireNonNull(stream, "stream is null"); + this.fileStatus = requireNonNull(fileStatus, "fileStatus is null"); + } + + @Override + public long getPosition() + throws IOException + { + ensureOpen(); + try { + return stream.getPos(); + } + catch (IOException e) { + throw new IOException("Get position for file %s failed: %s".formatted(location, e.getMessage()), e); + } + } + + @Override + public void seek(long position) + throws IOException + { + ensureOpen(); + if (position < 0) { + throw new IOException("Negative seek offset"); + } + if (position > fileStatus.getLength()) { + throw new IOException("Cannot seek to %s. File size is %s: %s".formatted(position, fileStatus.getLength(), location)); + } + try { + stream.seek(position); + } + catch (IOException e) { + throw new IOException("Cannot seek to %s: %s".formatted(position, location)); + } + } + + @Override + public long skip(long n) + throws IOException + { + ensureOpen(); + try { + return stream.skip(n); + } + catch (IOException e) { + throw new IOException("Skipping %s bytes of file %s failed: %s".formatted(n, location, e.getMessage()), e); + } + } + + @Override + public int read() + throws IOException + { + ensureOpen(); + try { + return stream.read(); + } + catch (IOException e) { + throw new IOException("Read of file %s failed: %s".formatted(location, e.getMessage()), e); + } + } + + @Override + public int read(byte[] b, int off, int len) + throws IOException + { + ensureOpen(); + checkFromIndexSize(off, len, b.length); + try { + return stream.read(b, off, len); + } + catch (IOException e) { + throw new IOException("Read of file %s failed: %s".formatted(location, e.getMessage()), e); + } + } + + @Override + public void close() + throws IOException + { + closed = true; + stream.close(); + } + + private void ensureOpen() + throws IOException + { + if (closed) { + throw new IOException("Input stream closed: " + location); + } + } + + @Override + public int available() + throws IOException + { + ensureOpen(); + return super.available(); + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java new file mode 100644 index 000000000000..0421b4229ea3 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import io.trino.filesystem.Location; + +import java.io.IOException; +import java.io.OutputStream; + +public final class AlluxioTrinoOutputStream + extends OutputStream +{ + private final Location location; + private final OutputStream delegate; + + private volatile boolean closed; + + public AlluxioTrinoOutputStream(Location location, OutputStream delegate) + { + this.location = location; + this.delegate = delegate; + } + + @Override + public void write(int b) + throws IOException + { + ensureOpen(); + delegate.write(b); + } + + @Override + public void flush() + throws IOException + { + ensureOpen(); + delegate.flush(); + } + + @Override + public void close() + throws IOException + { + closed = true; + delegate.close(); + } + + private void ensureOpen() + throws IOException + { + if (closed) { + throw new IOException("Output stream for %s closed: ".formatted(location)); + } + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java new file mode 100644 index 000000000000..493e808fb35e --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java @@ -0,0 +1,111 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.AlluxioURI; +import alluxio.client.file.URIStatus; +import io.trino.filesystem.Location; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Optional; + +public class AlluxioUtils +{ + private AlluxioUtils() {} + + public static Location convertToLocation(URIStatus fileStatus, String mountRoot) + { + if (fileStatus == null) { + return null; + } + String path = fileStatus.getPath(); + if (path == null) { + return null; + } + if (path.isEmpty()) { + return Location.of(""); + } + if (path.startsWith("alluxio://")) { + return Location.of(fileStatus.getPath()); + } + + String schema = "alluxio://"; + if (path.startsWith("/")) { + while (path.startsWith("/")) { + path = path.substring(1); + } + } + String mountRootWithSlash = mountRoot; + if (!mountRoot.endsWith("/")) { + mountRootWithSlash = mountRoot + "/"; + } + return Location.of(schema + mountRootWithSlash + path); + } + + public static String simplifyPath(String path) + { + // Use a deque to store the path components + Deque deque = new ArrayDeque<>(); + String[] segments = path.split("/"); + + for (String segment : segments) { + if (segment.isEmpty() || segment.equals(".")) { + // Ignore empty and current directory segments + continue; + } + if (segment.equals("..")) { + // If there's a valid directory to go back to, remove it + if (!deque.isEmpty()) { + deque.pollLast(); + } + } + else { + // Add the directory to the deque + deque.offerLast(segment); + } + } + + // Build the final simplified path from the deque + StringBuilder simplifiedPath = new StringBuilder(); + for (String dir : deque) { + simplifiedPath.append(dir).append("/"); + } + + // Retain trailing slash if it was in the original path + if (!path.endsWith("/") && simplifiedPath.length() > 0) { + simplifiedPath.setLength(simplifiedPath.length() - 1); + } + + return simplifiedPath.length() == 0 ? "" : simplifiedPath.toString(); + } + + public static AlluxioURI convertToAlluxioURI(Location location, String mountRoot) + { + Optional scheme = location.scheme(); + if (scheme.isPresent()) { + if (!scheme.get().equals("alluxio")) { + return new AlluxioURI(location.toString()); + } + } + String path = location.path(); + while (path.startsWith("/")) { + path = path.substring(1); + } + if (!mountRoot.endsWith("/")) { + mountRoot = mountRoot + "/"; + } + return new AlluxioURI(mountRoot + path); + } +} diff --git a/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java new file mode 100644 index 000000000000..f455f4a0e5dc --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java @@ -0,0 +1,118 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.AlluxioURI; +import alluxio.client.file.FileSystem; +import alluxio.client.file.FileSystemContext; +import alluxio.client.file.URIStatus; +import alluxio.conf.Configuration; +import alluxio.conf.InstancedConfiguration; +import alluxio.exception.AlluxioException; +import alluxio.grpc.DeletePOptions; +import io.trino.filesystem.AbstractTestTrinoFileSystem; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.spi.security.ConnectorIdentity; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestInstance; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class AbstractTestAlluxioFileSystem + extends AbstractTestTrinoFileSystem +{ + private TrinoFileSystem fileSystem; + private Location rootLocation; + private FileSystem alluxioFs; + private AlluxioFileSystemFactory alluxioFileSystemFactory; + + protected void initialize() + throws IOException + { + this.rootLocation = Location.of("alluxio:///"); + InstancedConfiguration conf = Configuration.copyGlobal(); + FileSystemContext fsContext = FileSystemContext.create(conf); + this.alluxioFs = FileSystem.Factory.create(fsContext); + this.alluxioFileSystemFactory = new AlluxioFileSystemFactory(conf); + this.fileSystem = alluxioFileSystemFactory.create(ConnectorIdentity.ofUser("alluxio")); + } + + @AfterAll + void tearDown() + { + fileSystem = null; + alluxioFs = null; + rootLocation = null; + alluxioFileSystemFactory = null; + } + + @AfterEach + void afterEach() + throws IOException, AlluxioException + { + AlluxioURI root = new AlluxioURI(getRootLocation().toString()); + + for (URIStatus status : alluxioFs.listStatus(root)) { + alluxioFs.delete(new AlluxioURI(status.getPath()), DeletePOptions.newBuilder().setRecursive(true).build()); + } + } + + @Override + protected boolean isHierarchical() + { + return true; + } + + @Override + protected TrinoFileSystem getFileSystem() + { + return fileSystem; + } + + @Override + protected Location getRootLocation() + { + return rootLocation; + } + + @Override + protected void verifyFileSystemIsEmpty() + { + AlluxioURI bucket = + AlluxioUtils.convertToAlluxioURI(rootLocation, ((AlluxioFileSystem) fileSystem).getMountRoot()); + try { + assertThat(alluxioFs.listStatus(bucket)).isEmpty(); + } + catch (IOException | AlluxioException e) { + throw new RuntimeException(e); + } + } + + @Override + protected final boolean supportsCreateExclusive() + { + return false; + } + + @Override + protected boolean supportsIncompleteWriteNoClobber() + { + return false; + } +} diff --git a/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioFileSystem.java new file mode 100644 index 000000000000..188d7e917cab --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioFileSystem.java @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +public class TestAlluxioFileSystem + extends AbstractTestAlluxioFileSystem +{ + private static final String IMAGE_NAME = "alluxio/alluxio:2.9.5"; + public static final DockerImageName ALLUXIO_IMAGE = DockerImageName.parse(IMAGE_NAME); + + @Container + private static final GenericContainer ALLUXIO_MASTER_CONTAINER = createAlluxioMasterContainer(); + + @Container + private static final GenericContainer ALLUXIO_WORKER_CONTAINER = createAlluxioWorkerContainer(); + + private static GenericContainer createAlluxioMasterContainer() + { + GenericContainer container = new GenericContainer<>(ALLUXIO_IMAGE); + container.withCommand("master-only") + .withEnv("ALLUXIO_JAVA_OPTS", + "-Dalluxio.security.authentication.type=NOSASL " + + "-Dalluxio.master.hostname=localhost " + + "-Dalluxio.worker.hostname=localhost " + + "-Dalluxio.master.mount.table.root.ufs=/opt/alluxio/underFSStorage " + + "-Dalluxio.master.journal.type=NOOP " + + "-Dalluxio.security.authorization.permission.enabled=false " + + "-Dalluxio.security.authorization.plugins.enabled=false ") + .withNetworkMode("host") + .withAccessToHost(true) + .waitingFor(Wait.forLogMessage(".*Primary started*\n", 1)); + return container; + } + + private static GenericContainer createAlluxioWorkerContainer() + { + GenericContainer container = new GenericContainer<>(ALLUXIO_IMAGE); + container.withCommand("worker-only") + .withNetworkMode("host") + .withEnv("ALLUXIO_JAVA_OPTS", + "-Dalluxio.security.authentication.type=NOSASL " + + "-Dalluxio.worker.ramdisk.size=128MB " + + "-Dalluxio.worker.hostname=localhost " + + "-Dalluxio.worker.tieredstore.level0.alias=HDD " + + "-Dalluxio.worker.tieredstore.level0.dirs.path=/tmp " + + "-Dalluxio.master.hostname=localhost " + + "-Dalluxio.security.authorization.permission.enabled=false " + + "-Dalluxio.security.authorization.plugins.enabled=false ") + .withAccessToHost(true) + .dependsOn(ALLUXIO_MASTER_CONTAINER) + .waitingFor(Wait.forLogMessage(".*Alluxio worker started.*\n", 1)); + return container; + } + + @BeforeAll + void setup() + throws IOException + { + initialize(); + // the SSHD container will be stopped by TestContainers on shutdown + // https://github.com/trinodb/trino/discussions/21969 + System.setProperty("ReportLeakedContainers.disabled", "true"); + } + + @Test + void testContainer() + { + assertThat(ALLUXIO_MASTER_CONTAINER.isRunning()).isTrue(); + assertThat(ALLUXIO_WORKER_CONTAINER.isRunning()).isTrue(); + } +} diff --git a/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioUtils.java b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioUtils.java new file mode 100644 index 000000000000..f38ec2f6db49 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioUtils.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.AlluxioURI; +import alluxio.client.file.URIStatus; +import alluxio.wire.FileInfo; +import io.trino.filesystem.Location; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TestAlluxioUtils +{ + @Test + public void testSimplifyPath() + { + String path = "test/level0-file0"; + assertThat(path).isEqualTo(AlluxioUtils.simplifyPath(path)); + path = "a/./b/../../c/"; + assertThat("c/").isEqualTo(AlluxioUtils.simplifyPath(path)); + } + + @Test + public void convertToLocation() + { + String mountRoot = "/"; + URIStatus fileStatus = new URIStatus(new FileInfo().setPath("/mnt/test/level0-file0")); + assertThat(Location.of("alluxio:///mnt/test/level0-file0")).isEqualTo(AlluxioUtils.convertToLocation(fileStatus, mountRoot)); + fileStatus = new URIStatus(new FileInfo().setPath("/mnt/test/level0/level1-file0")); + assertThat(Location.of("alluxio:///mnt/test/level0/level1-file0")).isEqualTo(AlluxioUtils.convertToLocation(fileStatus, mountRoot)); + fileStatus = new URIStatus(new FileInfo().setPath("/mnt/test2/level0/level1/level2-file0")); + assertThat(Location.of("alluxio:///mnt/test2/level0/level1/level2-file0")).isEqualTo(AlluxioUtils.convertToLocation(fileStatus, mountRoot)); + } + + @Test + public void testConvertToAlluxioURI() + { + Location location = Location.of("alluxio:///mnt/test/level0-file0"); + String mountRoot = "/"; + assertThat(new AlluxioURI("/mnt/test/level0-file0")).isEqualTo(AlluxioUtils.convertToAlluxioURI(location, mountRoot)); + location = Location.of("alluxio:///mnt/test/level0/level1-file0"); + assertThat(new AlluxioURI("/mnt/test/level0/level1-file0")).isEqualTo(AlluxioUtils.convertToAlluxioURI(location, mountRoot)); + location = Location.of("alluxio:///mnt/test2/level0/level1/level2-file0"); + assertThat(new AlluxioURI("/mnt/test2/level0/level1/level2-file0")).isEqualTo(AlluxioUtils.convertToAlluxioURI(location, mountRoot)); + } +} diff --git a/lib/trino-filesystem-manager/pom.xml b/lib/trino-filesystem-manager/pom.xml index 5eadbce9d3fb..429a0476bf48 100644 --- a/lib/trino-filesystem-manager/pom.xml +++ b/lib/trino-filesystem-manager/pom.xml @@ -42,6 +42,11 @@ trino-filesystem + + io.trino + trino-filesystem-alluxio + + io.trino trino-filesystem-azure diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java index 4c5b639dfff2..7394476288d9 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java @@ -18,6 +18,7 @@ public class FileSystemConfig { private boolean hadoopEnabled; + private boolean alluxioEnabled; private boolean nativeAzureEnabled; private boolean nativeS3Enabled; private boolean nativeGcsEnabled; @@ -35,6 +36,18 @@ public FileSystemConfig setHadoopEnabled(boolean hadoopEnabled) return this; } + public boolean isAlluxioEnabled() + { + return alluxioEnabled; + } + + @Config("fs.alluxio.enabled") + public FileSystemConfig setAlluxioEnabled(boolean nativeAlluxioEnabled) + { + this.alluxioEnabled = nativeAlluxioEnabled; + return this; + } + public boolean isNativeAzureEnabled() { return nativeAzureEnabled; diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java index 4312c9ab359f..d0e980ec5409 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java @@ -24,6 +24,8 @@ import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.alluxio.AlluxioFileSystemCacheModule; +import io.trino.filesystem.alluxio.AlluxioFileSystemFactory; +import io.trino.filesystem.alluxio.AlluxioFileSystemModule; import io.trino.filesystem.azure.AzureFileSystemFactory; import io.trino.filesystem.azure.AzureFileSystemModule; import io.trino.filesystem.cache.CacheFileSystemFactory; @@ -89,6 +91,11 @@ protected void setup(Binder binder) var factories = newMapBinder(binder, String.class, TrinoFileSystemFactory.class); + if (config.isAlluxioEnabled()) { + install(new AlluxioFileSystemModule()); + factories.addBinding("alluxio").to(AlluxioFileSystemFactory.class); + } + if (config.isNativeAzureEnabled()) { install(new AzureFileSystemModule()); factories.addBinding("abfs").to(AzureFileSystemFactory.class); diff --git a/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java b/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java index 9609484c0a5b..088d7653d53c 100644 --- a/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java +++ b/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java @@ -29,6 +29,7 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(FileSystemConfig.class) .setHadoopEnabled(false) + .setAlluxioEnabled(false) .setNativeAzureEnabled(false) .setNativeS3Enabled(false) .setNativeGcsEnabled(false) @@ -40,6 +41,7 @@ public void testExplicitPropertyMappings() { Map properties = ImmutableMap.builder() .put("fs.hadoop.enabled", "true") + .put("fs.alluxio.enabled", "true") .put("fs.native-azure.enabled", "true") .put("fs.native-s3.enabled", "true") .put("fs.native-gcs.enabled", "true") @@ -48,6 +50,7 @@ public void testExplicitPropertyMappings() FileSystemConfig expected = new FileSystemConfig() .setHadoopEnabled(true) + .setAlluxioEnabled(true) .setNativeAzureEnabled(true) .setNativeS3Enabled(true) .setNativeGcsEnabled(true) diff --git a/pom.xml b/pom.xml index 6ab39ea9ebd4..1f3202abf0ca 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,7 @@ lib/trino-array lib/trino-cache lib/trino-filesystem + lib/trino-filesystem-alluxio lib/trino-filesystem-azure lib/trino-filesystem-cache-alluxio lib/trino-filesystem-gcs @@ -1101,6 +1102,12 @@ test-jar + + io.trino + trino-filesystem-alluxio + ${project.version} + + io.trino trino-filesystem-azure @@ -1729,14 +1736,6 @@ alluxio-core-client-fs ${dep.alluxio.version} - - io.grpc - grpc-core - - - io.grpc - grpc-stub - org.alluxio alluxio-core-common @@ -1745,10 +1744,34 @@ org.alluxio alluxio-core-transport + + org.apache.logging.log4j + log4j-api + org.apache.logging.log4j log4j-core + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.rocksdb + rocksdbjni + + + org.slf4j + log4j-over-slf4j + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-reload4j + @@ -1757,10 +1780,18 @@ alluxio-core-common ${dep.alluxio.version} + + com.amazonaws + aws-java-sdk-core + com.rabbitmq amqp-client + + commons-cli + commons-cli + commons-logging commons-logging @@ -1770,28 +1801,12 @@ jetcd-core - io.grpc - grpc-api - - - io.grpc - grpc-core + io.swagger + swagger-annotations - io.grpc - grpc-netty - - - io.grpc - grpc-services - - - io.grpc - grpc-stub - - - io.netty - netty-tcnative-boringssl-static + jakarta.servlet + jakarta.servlet-api org.alluxio @@ -1817,6 +1832,58 @@ org.eclipse.jetty jetty-servlet + + org.reflections + reflections + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-reload4j + + + + + + org.alluxio + alluxio-core-transport + ${dep.alluxio.version} + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.slf4j + log4j-over-slf4j + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-reload4j + + + + + + org.alluxio + alluxio-underfs-local + ${dep.alluxio.version} + + + org.apache.logging.log4j + log4j-core + @@ -2567,6 +2634,10 @@ org.alluxio alluxio-core-transport + + org.alluxio + alluxio-underfs-local + git.properties