- Extends DDBDeserializer, creating the structType
- create The DDBJobConf indicating the dynamo db details
- Use DatasetDDBReader Function
public class FilmDeserializer extends DDBDeserializer {
@Override
public StructType structType() {
return DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("Film", DataTypes.StringType,false),
DataTypes.createStructField("Genre", DataTypes.StringType,false)
});
}
}
DDBJobConf ddbconf=DDBJobConf.Builder.newInstance()
.setDdbRegion(region)
.setWritePercent(writePercent)
.setWriteThroughput(writeThoughput)
.setOutputTableName(outputTableName)
.setDdbEndpoint(endpoint)
.create();
Dataset<Row> dataset= new DatasetDDBReader().read(new FilmDeserializer(),ddbconf,session)
- Extends DDBSerializer and create the fieldList indicating the column Name and the the specific type.
- create The DDBJobConf indicating the dynamo db details
- read and pass the Dataset to the DatasetDDBWriter function
public class FilmSerializer extends DDBSerializer {
@Override
protected List<DDBField> fieldMap() {
return Arrays.asList(
new DDBField("Film", StringType.class),
new DDBField("Genre", StringType.class),
new DDBField("Lead Studio", StringType.class),
new DDBField("Audience score %", IntegerType.class),
new DDBField("Profitability", DecimalType.class),
new DDBField("Rotten Tomatoes %", StringType.class),
new DDBField("Worldwide Gross", StringType.class),
new DDBField("Year", IntegerType.class),
new DDBField("Actors", StringType.class)
);
}
}
DDBJobConf ddbconf=DDBJobConf.Builder.newInstance()
.setDdbRegion(region)
.setWritePercent(writePercent)
.setWriteThroughput(writeThoughput)
.setOutputTableName(outputTableName)
.setDdbEndpoint(endpoint)
.create();
session=SparkSession.builder().master("local").getOrCreate();
inputDataset=session.read.option("header","true").csv("./film.csv);
new DatasetDDBWriter().write(new FilmSerializer(),ddbconf,session);