From 236753ca0c3db976943dfdaf00f169a152c7b8fc Mon Sep 17 00:00:00 2001 From: duripeng <453243496@qq.com> Date: Sun, 3 Dec 2023 05:16:32 +0800 Subject: [PATCH 1/2] [Enhance](fe) Iceberg table in HMS catalog supports broker scan --- .../iceberg/broker/BrokerInputFile.java | 74 ++++++++ .../iceberg/broker/BrokerInputStream.java | 169 ++++++++++++++++++ .../iceberg/broker/IcebergBrokerIO.java | 80 +++++++++ .../planner/external/FileQueryScanNode.java | 9 +- .../iceberg/IcebergMetadataCache.java | 19 +- 5 files changed, 343 insertions(+), 8 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputFile.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputStream.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/IcebergBrokerIO.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputFile.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputFile.java new file mode 100644 index 00000000000000..529df6c0af10a2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputFile.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.broker; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.common.util.BrokerReader; +import org.apache.doris.thrift.TBrokerFD; + +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; + +import java.io.IOException; + +public class BrokerInputFile implements InputFile { + + private Long fileLength = null; + + private final String filePath; + private final BrokerDesc brokerDesc; + private BrokerReader reader; + private TBrokerFD fd; + + private BrokerInputFile(String filePath, BrokerDesc brokerDesc) { + this.filePath = filePath; + this.brokerDesc = brokerDesc; + } + + private void init() throws IOException { + this.reader = BrokerReader.create(this.brokerDesc); + this.fileLength = this.reader.getFileLength(filePath); + this.fd = this.reader.open(filePath); + } + + public static BrokerInputFile create(String filePath, BrokerDesc brokerDesc) throws IOException { + BrokerInputFile inputFile = new BrokerInputFile(filePath, brokerDesc); + inputFile.init(); + return inputFile; + } + + @Override + public long getLength() { + return fileLength; + } + + @Override + public SeekableInputStream newStream() { + return new BrokerInputStream(this.reader, this.fd, this.fileLength); + } + + @Override + public String location() { + return filePath; + } + + @Override + public boolean exists() { + return fileLength != null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputStream.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputStream.java new file mode 100644 index 00000000000000..b9f6d30aed4fd3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputStream.java @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.broker; + +import org.apache.doris.common.util.BrokerReader; +import org.apache.doris.thrift.TBrokerFD; + +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; + +public class BrokerInputStream extends SeekableInputStream { + private static final Logger LOG = LogManager.getLogger(BrokerInputStream.class); + private static final int COPY_BUFFER_SIZE = 1024 * 1024; // 1MB + + private final byte[] tmpBuf = new byte[COPY_BUFFER_SIZE]; + private long currentPos = 0; + private long markPos = 0; + + private long bufferOffset = 0; + private long bufferLimit = 0; + private final BrokerReader reader; + private final TBrokerFD fd; + private final long fileLength; + + public BrokerInputStream(BrokerReader reader, TBrokerFD fd, long fileLength) { + this.fd = fd; + this.reader = reader; + this.fileLength = fileLength; + } + + @Override + public long getPos() throws IOException { + return currentPos; + } + + @Override + public void seek(long newPos) throws IOException { + currentPos = newPos; + } + + @Override + public int read() throws IOException { + try { + if (currentPos < bufferOffset || currentPos > bufferLimit || bufferOffset >= bufferLimit) { + bufferOffset = currentPos; + fill(); + } + if (currentPos > bufferLimit) { + LOG.warn("current pos {} is larger than buffer limit {}." + + " should not happen.", currentPos, bufferLimit); + return -1; + } + + int pos = (int) (currentPos - bufferOffset); + int res = Byte.toUnsignedInt(tmpBuf[pos]); + ++currentPos; + return res; + } catch (BrokerReader.EOFException e) { + return -1; + } + } + + @SuppressWarnings("NullableProblems") + @Override + public int read(byte[] b) throws IOException { + try { + byte[] data = reader.pread(fd, currentPos, b.length); + System.arraycopy(data, 0, b, 0, data.length); + currentPos += data.length; + return data.length; + } catch (BrokerReader.EOFException e) { + return -1; + } + } + + @SuppressWarnings("NullableProblems") + @Override + public int read(byte[] b, int off, int len) throws IOException { + try { + if (currentPos < bufferOffset || currentPos > bufferLimit || currentPos + len > bufferLimit) { + if (len > COPY_BUFFER_SIZE) { + // the data to be read is larger then max size of buffer. + // read it directly. + byte[] data = reader.pread(fd, currentPos, len); + System.arraycopy(data, 0, b, off, data.length); + currentPos += data.length; + return data.length; + } + // fill the buffer first + bufferOffset = currentPos; + fill(); + } + + if (currentPos > bufferLimit) { + LOG.warn("current pos {} is larger than buffer limit {}." + + " should not happen.", currentPos, bufferLimit); + return -1; + } + + int start = (int) (currentPos - bufferOffset); + int readLen = Math.min(len, (int) (bufferLimit - bufferOffset)); + System.arraycopy(tmpBuf, start, b, off, readLen); + currentPos += readLen; + return readLen; + } catch (BrokerReader.EOFException e) { + return -1; + } + } + + private void fill() throws IOException, BrokerReader.EOFException { + if (bufferOffset == this.fileLength) { + throw new BrokerReader.EOFException(); + } + byte[] data = reader.pread(fd, bufferOffset, COPY_BUFFER_SIZE); + System.arraycopy(data, 0, tmpBuf, 0, data.length); + bufferLimit = bufferOffset + data.length; + } + + @Override + public long skip(long n) throws IOException { + final long left = fileLength - currentPos; + long min = Math.min(n, left); + currentPos += min; + return min; + } + + @Override + public int available() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + reader.close(fd); + } + + @Override + public synchronized void mark(int readlimit) { + markPos = currentPos; + } + + @Override + public synchronized void reset() throws IOException { + currentPos = markPos; + } + + @Override + public boolean markSupported() { + return true; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/IcebergBrokerIO.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/IcebergBrokerIO.java new file mode 100644 index 00000000000000..ec3b4f170301b1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/IcebergBrokerIO.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.broker; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.datasource.HMSExternalCatalog; + +import com.google.common.collect.ImmutableMap; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.util.SerializableMap; + +import java.io.IOException; +import java.util.Map; + +/** + * FileIO implementation that uses broker to execute Iceberg files IO operation. + */ +public class IcebergBrokerIO implements FileIO { + + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + private BrokerDesc brokerDesc = null; + + @Override + public void initialize(Map props) { + this.properties = SerializableMap.copyOf(props); + if (!properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { + throw new UnsupportedOperationException(String.format("No broker is specified, " + + "try to set '%s' in HMS Catalog", HMSExternalCatalog.BIND_BROKER_NAME)); + } + String brokerName = properties.get(HMSExternalCatalog.BIND_BROKER_NAME); + this.brokerDesc = new BrokerDesc(brokerName, properties.immutableMap()); + } + + @Override + public Map properties() { + return properties.immutableMap(); + } + + @Override + public InputFile newInputFile(String path) { + if (brokerDesc == null) { + throw new UnsupportedOperationException("IcebergBrokerIO should be initialized first"); + } + try { + return BrokerInputFile.create(path, brokerDesc); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public OutputFile newOutputFile(String path) { + throw new UnsupportedOperationException("IcebergBrokerIO does not support writing files"); + } + + @Override + public void deleteFile(String path) { + throw new UnsupportedOperationException("IcebergBrokerIO does not support deleting files"); + } + + @Override + public void close() { } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 9bae707c2f3ed8..1da2c789bcf3ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -34,6 +34,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.AcidInfo; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; @@ -313,7 +314,13 @@ public void createScanRangeLocations() throws UserException { List pathPartitionKeys = getPathPartitionKeys(); for (Split split : inputSplits) { FileSplit fileSplit = (FileSplit) split; - TFileType locationType = getLocationType(fileSplit.getPath().toString()); + TFileType locationType; + if (fileSplit instanceof IcebergSplit + && ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { + locationType = TFileType.FILE_BROKER; + } else { + locationType = getLocationType(fileSplit.getPath().toString()); + } TScanRangeLocations curLocations = newLocations(); // If fileSplit has partition values, use the values collected from hive partitions. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java index 5f79623ff4ad9a..b691f2a21819c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java @@ -201,15 +201,20 @@ private Table createIcebergTable(String uri, Map hdfsConf, Strin HiveCatalog hiveCatalog = new HiveCatalog(); hiveCatalog.setConf(conf); - Map catalogProperties = new HashMap<>(); - catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, uri); - catalogProperties.put("uri", uri); - hiveCatalog.initialize("hive", catalogProperties); - + if (props.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { + // Set Iceberg FileIO implementation as `IcebergBrokerIO` when Catalog binding broker is specified. + props.put("io-impl", "org.apache.doris.datasource.iceberg.broker.IcebergBrokerIO"); + props.put(HMSProperties.HIVE_METASTORE_URIS, uri); + props.put("uri", uri); + hiveCatalog.initialize("hive", props); + } else { + Map catalogProperties = new HashMap<>(); + catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, uri); + catalogProperties.put("uri", uri); + hiveCatalog.initialize("hive", catalogProperties); + } Table table = HiveMetaStoreClientHelper.ugiDoAs(conf, () -> hiveCatalog.loadTable(TableIdentifier.of(db, tbl))); - initIcebergTableFileIO(table, props); - return table; } From 57432ebd2ecc4c7d9567246c22c1fccaf74f03c9 Mon Sep 17 00:00:00 2001 From: duripeng <453243496@qq.com> Date: Sun, 3 Dec 2023 05:16:32 +0800 Subject: [PATCH 2/2] [Enhance](fe) Iceberg table in HMS catalog supports broker scan --- docs/en/docs/lakehouse/multi-catalog/hive.md | 7 +++++++ docs/zh-CN/docs/lakehouse/multi-catalog/hive.md | 6 ++++++ .../apache/doris/planner/external/FileQueryScanNode.java | 1 + .../planner/external/iceberg/IcebergMetadataCache.java | 2 -- 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/docs/en/docs/lakehouse/multi-catalog/hive.md b/docs/en/docs/lakehouse/multi-catalog/hive.md index 25fddea125064d..01c78d3599f8f4 100644 --- a/docs/en/docs/lakehouse/multi-catalog/hive.md +++ b/docs/en/docs/lakehouse/multi-catalog/hive.md @@ -391,6 +391,13 @@ Add following setting when creating an HMS catalog, file splitting and scanning "broker.name" = "test_broker" ``` + +Doris has implemented Broker query support for HMS Catalog Iceberg based on the Iceberg `FileIO` interface. If needed, the following configuration can be added when creating the HMS Catalog. + +```sql +"io-impl" = "org.apache.doris.datasource.iceberg.broker.IcebergBrokerIO" +``` + ## Integrate with Apache Ranger Apache Ranger is a security framework for monitoring, enabling services, and comprehensive data security access management on the Hadoop platform. diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md index e75977c25f4766..b4efb291f89ed6 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md @@ -373,6 +373,12 @@ CREATE CATALOG hive PROPERTIES ( "broker.name" = "test_broker" ``` +Doris 基于 Iceberg `FileIO` 接口实现了 Broker 查询 HMS Catalog Iceberg 的支持。如有需求,可以在创建 HMS Catalog 时增加如下配置。 + +```sql +"io-impl" = "org.apache.doris.datasource.iceberg.broker.IcebergBrokerIO" +``` + ## 使用 Ranger 进行权限校验 Apache Ranger是一个用来在Hadoop平台上进行监控,启用服务,以及全方位数据安全访问管理的安全框架。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 1da2c789bcf3ba..b54b05f47e194a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -39,6 +39,7 @@ import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.external.iceberg.IcebergSplit; import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java index b691f2a21819c5..91a208202d05e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java @@ -202,8 +202,6 @@ private Table createIcebergTable(String uri, Map hdfsConf, Strin hiveCatalog.setConf(conf); if (props.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { - // Set Iceberg FileIO implementation as `IcebergBrokerIO` when Catalog binding broker is specified. - props.put("io-impl", "org.apache.doris.datasource.iceberg.broker.IcebergBrokerIO"); props.put(HMSProperties.HIVE_METASTORE_URIS, uri); props.put("uri", uri); hiveCatalog.initialize("hive", props);