Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhance](fe) Iceberg table in HMS catalog supports broker scan #28107

Merged
merged 2 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/en/docs/lakehouse/multi-catalog/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,13 @@ Add following setting when creating an HMS catalog, file splitting and scanning
"broker.name" = "test_broker"
```


Doris has implemented Broker query support for HMS Catalog Iceberg based on the Iceberg `FileIO` interface. If needed, the following configuration can be added when creating the HMS Catalog.

```sql
"io-impl" = "org.apache.doris.datasource.iceberg.broker.IcebergBrokerIO"
```

## Integrate with Apache Ranger

Apache Ranger is a security framework for monitoring, enabling services, and comprehensive data security access management on the Hadoop platform.
Expand Down
6 changes: 6 additions & 0 deletions docs/zh-CN/docs/lakehouse/multi-catalog/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,12 @@ CREATE CATALOG hive PROPERTIES (
"broker.name" = "test_broker"
```

Doris 基于 Iceberg `FileIO` 接口实现了 Broker 查询 HMS Catalog Iceberg 的支持。如有需求,可以在创建 HMS Catalog 时增加如下配置。

```sql
"io-impl" = "org.apache.doris.datasource.iceberg.broker.IcebergBrokerIO"
```

## 使用 Ranger 进行权限校验

Apache Ranger是一个用来在Hadoop平台上进行监控,启用服务,以及全方位数据安全访问管理的安全框架。
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.broker;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.common.util.BrokerReader;
import org.apache.doris.thrift.TBrokerFD;

import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;

import java.io.IOException;

public class BrokerInputFile implements InputFile {

private Long fileLength = null;

private final String filePath;
private final BrokerDesc brokerDesc;
private BrokerReader reader;
private TBrokerFD fd;

private BrokerInputFile(String filePath, BrokerDesc brokerDesc) {
this.filePath = filePath;
this.brokerDesc = brokerDesc;
}

private void init() throws IOException {
this.reader = BrokerReader.create(this.brokerDesc);
this.fileLength = this.reader.getFileLength(filePath);
this.fd = this.reader.open(filePath);
}

public static BrokerInputFile create(String filePath, BrokerDesc brokerDesc) throws IOException {
BrokerInputFile inputFile = new BrokerInputFile(filePath, brokerDesc);
inputFile.init();
return inputFile;
}

@Override
public long getLength() {
return fileLength;
}

@Override
public SeekableInputStream newStream() {
return new BrokerInputStream(this.reader, this.fd, this.fileLength);
}

@Override
public String location() {
return filePath;
}

@Override
public boolean exists() {
return fileLength != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.broker;

import org.apache.doris.common.util.BrokerReader;
import org.apache.doris.thrift.TBrokerFD;

import org.apache.iceberg.io.SeekableInputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;

public class BrokerInputStream extends SeekableInputStream {
private static final Logger LOG = LogManager.getLogger(BrokerInputStream.class);
private static final int COPY_BUFFER_SIZE = 1024 * 1024; // 1MB

private final byte[] tmpBuf = new byte[COPY_BUFFER_SIZE];
private long currentPos = 0;
private long markPos = 0;

private long bufferOffset = 0;
private long bufferLimit = 0;
private final BrokerReader reader;
private final TBrokerFD fd;
private final long fileLength;

public BrokerInputStream(BrokerReader reader, TBrokerFD fd, long fileLength) {
this.fd = fd;
this.reader = reader;
this.fileLength = fileLength;
}

@Override
public long getPos() throws IOException {
return currentPos;
}

@Override
public void seek(long newPos) throws IOException {
currentPos = newPos;
}

@Override
public int read() throws IOException {
try {
if (currentPos < bufferOffset || currentPos > bufferLimit || bufferOffset >= bufferLimit) {
bufferOffset = currentPos;
fill();
}
if (currentPos > bufferLimit) {
LOG.warn("current pos {} is larger than buffer limit {}."
+ " should not happen.", currentPos, bufferLimit);
return -1;
}

int pos = (int) (currentPos - bufferOffset);
int res = Byte.toUnsignedInt(tmpBuf[pos]);
++currentPos;
return res;
} catch (BrokerReader.EOFException e) {
return -1;
}
}

@SuppressWarnings("NullableProblems")
@Override
public int read(byte[] b) throws IOException {
try {
byte[] data = reader.pread(fd, currentPos, b.length);
System.arraycopy(data, 0, b, 0, data.length);
currentPos += data.length;
return data.length;
} catch (BrokerReader.EOFException e) {
return -1;
}
}

@SuppressWarnings("NullableProblems")
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
if (currentPos < bufferOffset || currentPos > bufferLimit || currentPos + len > bufferLimit) {
if (len > COPY_BUFFER_SIZE) {
// the data to be read is larger then max size of buffer.
// read it directly.
byte[] data = reader.pread(fd, currentPos, len);
System.arraycopy(data, 0, b, off, data.length);
currentPos += data.length;
return data.length;
}
// fill the buffer first
bufferOffset = currentPos;
fill();
}

if (currentPos > bufferLimit) {
LOG.warn("current pos {} is larger than buffer limit {}."
+ " should not happen.", currentPos, bufferLimit);
return -1;
}

int start = (int) (currentPos - bufferOffset);
int readLen = Math.min(len, (int) (bufferLimit - bufferOffset));
System.arraycopy(tmpBuf, start, b, off, readLen);
currentPos += readLen;
return readLen;
} catch (BrokerReader.EOFException e) {
return -1;
}
}

private void fill() throws IOException, BrokerReader.EOFException {
if (bufferOffset == this.fileLength) {
throw new BrokerReader.EOFException();
}
byte[] data = reader.pread(fd, bufferOffset, COPY_BUFFER_SIZE);
System.arraycopy(data, 0, tmpBuf, 0, data.length);
bufferLimit = bufferOffset + data.length;
}

@Override
public long skip(long n) throws IOException {
final long left = fileLength - currentPos;
long min = Math.min(n, left);
currentPos += min;
return min;
}

@Override
public int available() throws IOException {
return 0;
}

@Override
public void close() throws IOException {
reader.close(fd);
}

@Override
public synchronized void mark(int readlimit) {
markPos = currentPos;
}

@Override
public synchronized void reset() throws IOException {
currentPos = markPos;
}

@Override
public boolean markSupported() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.broker;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.datasource.HMSExternalCatalog;

import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.util.SerializableMap;

import java.io.IOException;
import java.util.Map;

/**
* FileIO implementation that uses broker to execute Iceberg files IO operation.
*/
public class IcebergBrokerIO implements FileIO {

private SerializableMap<String, String> properties = SerializableMap.copyOf(ImmutableMap.of());
private BrokerDesc brokerDesc = null;

@Override
public void initialize(Map<String, String> props) {
this.properties = SerializableMap.copyOf(props);
if (!properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
throw new UnsupportedOperationException(String.format("No broker is specified, "
+ "try to set '%s' in HMS Catalog", HMSExternalCatalog.BIND_BROKER_NAME));
}
String brokerName = properties.get(HMSExternalCatalog.BIND_BROKER_NAME);
this.brokerDesc = new BrokerDesc(brokerName, properties.immutableMap());
}

@Override
public Map<String, String> properties() {
return properties.immutableMap();
}

@Override
public InputFile newInputFile(String path) {
if (brokerDesc == null) {
throw new UnsupportedOperationException("IcebergBrokerIO should be initialized first");
}
try {
return BrokerInputFile.create(path, brokerDesc);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public OutputFile newOutputFile(String path) {
throw new UnsupportedOperationException("IcebergBrokerIO does not support writing files");
}

@Override
public void deleteFile(String path) {
throw new UnsupportedOperationException("IcebergBrokerIO does not support deleting files");
}

@Override
public void close() { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.AcidInfo;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.external.iceberg.IcebergSplit;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
Expand Down Expand Up @@ -313,7 +315,13 @@ public void createScanRangeLocations() throws UserException {
List<String> pathPartitionKeys = getPathPartitionKeys();
for (Split split : inputSplits) {
FileSplit fileSplit = (FileSplit) split;
TFileType locationType = getLocationType(fileSplit.getPath().toString());
TFileType locationType;
if (fileSplit instanceof IcebergSplit
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
locationType = TFileType.FILE_BROKER;
} else {
locationType = getLocationType(fileSplit.getPath().toString());
}

TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected from hive partitions.
Expand Down
Loading
Loading