Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ingesting older iceberg snapshots #15348

Merged
merged 4 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions docs/ingestion/input-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,8 @@ The following is a sample spec for a HDFS warehouse source:
},
"warehouseSource": {
"type": "hdfs"
}
},
"snapshotTime": "2023-06-01T00:00:00.000Z",
},
"inputFormat": {
"type": "parquet"
Expand Down Expand Up @@ -937,10 +938,11 @@ The following is a sample spec for a S3 warehouse source:
|--------|-----------|---------|
|type|Set the value to `iceberg`.|yes|
|tableName|The Iceberg table name configured in the catalog.|yes|
|namespace|The Iceberg namespace associated with the table|yes|
|icebergFilter|The JSON Object that filters data files within a snapshot|no|
|icebergCatalog|The JSON Object used to define the catalog that manages the configured Iceberg table|yes|
|warehouseSource|The JSON Object that defines the native input source for reading the data files from the warehouse|yes|
|namespace|The Iceberg namespace associated with the table.|yes|
|icebergFilter|The JSON Object that filters data files within a snapshot.|no|
|icebergCatalog|The JSON Object used to define the catalog that manages the configured Iceberg table.|yes|
|warehouseSource|The JSON Object that defines the native input source for reading the data files from the warehouse.|yes|
|snapshotTime|Timestamp in ISO8601 DateTime format that will be used to fetch the most recent snapshot as of this time.|no|

###Catalog Object

Expand Down
2 changes: 1 addition & 1 deletion extensions-contrib/druid-iceberg-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<modelVersion>4.0.0</modelVersion>

<properties>
<iceberg.core.version>1.4.0</iceberg.core.version>
<iceberg.core.version>1.4.1</iceberg.core.version>
<hive.version>3.1.3</hive.version>
</properties>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import org.joda.time.DateTime;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -54,12 +55,15 @@ public abstract class IcebergCatalog
*
* @param tableNamespace The catalog namespace under which the table is defined
* @param tableName The iceberg table name
* @param icebergFilter The iceberg filter that needs to be applied before reading the files
* @param snapshotTime Datetime that will be used to fetch the most recent snapshot as of this time
* @return a list of data file paths
*/
public List<String> extractSnapshotDataFiles(
String tableNamespace,
String tableName,
IcebergFilter icebergFilter
IcebergFilter icebergFilter,
DateTime snapshotTime
)
{
Catalog catalog = retrieveCatalog();
Expand All @@ -85,7 +89,9 @@ public List<String> extractSnapshotDataFiles(
if (icebergFilter != null) {
tableScan = icebergFilter.filter(tableScan);
}

if (snapshotTime != null) {
tableScan = tableScan.asOfTime(snapshotTime.getMillis());
}
CloseableIterable<FileScanTask> tasks = tableScan.planFiles();
CloseableIterable.transform(tasks, FileScanTask::file)
.forEach(dataFile -> dataFilePaths.add(dataFile.path().toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.iceberg.filter.IcebergFilter;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.io.File;
Expand Down Expand Up @@ -68,6 +69,9 @@ public class IcebergInputSource implements SplittableInputSource<List<String>>
@JsonProperty
private InputSourceFactory warehouseSource;

@JsonProperty
private final DateTime snapshotTime;

private boolean isLoaded = false;

private SplittableInputSource delegateInputSource;
Expand All @@ -78,14 +82,16 @@ public IcebergInputSource(
@JsonProperty("namespace") String namespace,
@JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter,
@JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog,
@JsonProperty("warehouseSource") InputSourceFactory warehouseSource
@JsonProperty("warehouseSource") InputSourceFactory warehouseSource,
@JsonProperty("snapshotTime") @Nullable DateTime snapshotTime
)
{
this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null");
this.namespace = Preconditions.checkNotNull(namespace, "namespace cannot be null");
this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null");
this.icebergFilter = icebergFilter;
this.warehouseSource = Preconditions.checkNotNull(warehouseSource, "warehouseSource cannot be null");
this.snapshotTime = snapshotTime;
}

@Override
Expand Down Expand Up @@ -164,6 +170,13 @@ public IcebergFilter getIcebergFilter()
return icebergFilter;
}

@Nullable
@JsonProperty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally mark the getter nullable as well.

public DateTime getSnapshotTime()
{
return snapshotTime;
}

public SplittableInputSource getDelegateInputSource()
{
return delegateInputSource;
Expand All @@ -174,7 +187,8 @@ protected void retrieveIcebergDatafiles()
List<String> snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles(
getNamespace(),
getTableName(),
getIcebergFilter()
getIcebergFilter(),
getSnapshotTime()
);
if (snapshotDataFiles.isEmpty()) {
delegateInputSource = new EmptyInputSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.LocalInputSourceFactory;
import org.apache.druid.iceberg.filter.IcebergEqualsFilter;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Files;
Expand All @@ -43,7 +44,9 @@
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand All @@ -62,32 +65,38 @@ public class IcebergInputSourceTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

IcebergCatalog testCatalog;
private IcebergCatalog testCatalog;
private TableIdentifier tableIdentifier;

Schema tableSchema = new Schema(
private Schema tableSchema = new Schema(
Types.NestedField.required(1, "id", Types.StringType.get()),
Types.NestedField.required(2, "name", Types.StringType.get())
);
Map<String, Object> tableData = ImmutableMap.of("id", "123988", "name", "Foo");
private Map<String, Object> tableData = ImmutableMap.of("id", "123988", "name", "Foo");

private static final String NAMESPACE = "default";
private static final String TABLENAME = "foosTable";

@Test
public void testInputSource() throws IOException
@Before
public void setup() throws IOException
{
final File warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);
tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);

createAndLoadTable(tableIdentifier);
}

@Test
public void testInputSource() throws IOException
{
IcebergInputSource inputSource = new IcebergInputSource(
TABLENAME,
NAMESPACE,
null,
testCatalog,
new LocalInputSourceFactory()
new LocalInputSourceFactory(),
null
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
List<File> localInputSourceList = splits.map(inputSource::withSplit)
Expand All @@ -111,45 +120,33 @@ public void testInputSource() throws IOException
Assert.assertEquals(tableData.get("id"), record.get(0));
Assert.assertEquals(tableData.get("name"), record.get(1));
}
dropTableFromCatalog(tableIdentifier);
}

@Test
public void testInputSourceWithEmptySource() throws IOException
{
final File warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);

createAndLoadTable(tableIdentifier);

IcebergInputSource inputSource = new IcebergInputSource(
TABLENAME,
NAMESPACE,
new IcebergEqualsFilter("id", "0000"),
testCatalog,
new LocalInputSourceFactory()
new LocalInputSourceFactory(),
null
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
Assert.assertEquals(0, splits.count());
dropTableFromCatalog(tableIdentifier);
}

@Test
public void testInputSourceWithFilter() throws IOException
{
final File warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);

createAndLoadTable(tableIdentifier);

IcebergInputSource inputSource = new IcebergInputSource(
TABLENAME,
NAMESPACE,
new IcebergEqualsFilter("id", "123988"),
testCatalog,
new LocalInputSourceFactory()
new LocalInputSourceFactory(),
null
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
List<File> localInputSourceList = splits.map(inputSource::withSplit)
Expand All @@ -173,6 +170,26 @@ public void testInputSourceWithFilter() throws IOException
Assert.assertEquals(tableData.get("id"), record.get(0));
Assert.assertEquals(tableData.get("name"), record.get(1));
}
}

@Test
public void testInputSourceReadFromLatestSnapshot() throws IOException
{
IcebergInputSource inputSource = new IcebergInputSource(
TABLENAME,
NAMESPACE,
null,
testCatalog,
new LocalInputSourceFactory(),
DateTimes.nowUtc()
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
Assert.assertEquals(1, splits.count());
}

@After
public void tearDown()
{
dropTableFromCatalog(tableIdentifier);
}

Expand Down
1 change: 1 addition & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,7 @@ numShards
IngestSegment
IngestSegmentFirehose
maxSizes
snapshotTime
windowPeriod
2012-01-01T00
2012-01-03T00
Expand Down