Skip to content

Commit

Permalink
Refactoring io.delta package to io.delta.tables
Browse files Browse the repository at this point in the history
Staging this PR for now.
Changing the namespace `io.delta` to `io.delta.tables`

No new tests added, re ran old tests.

Author: Rahul Mahadev <[email protected]>

GitOrigin-RevId: ab6a5968a29f698426bea27db0f33f431da637ef
  • Loading branch information
rahulsmahadev authored and tdas committed Aug 1, 2019
1 parent df0393e commit e75c8d1
Show file tree
Hide file tree
Showing 14 changed files with 49 additions and 49 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ enablePlugins(GenJavadocPlugin, JavaUnidocPlugin, ScalaUnidocPlugin)

// Configure Scala unidoc
scalacOptions in(ScalaUnidoc, unidoc) ++= Seq(
"-skip-packages", "org:com:io.delta.execution",
"-skip-packages", "org:com:io.delta.tables.execution",
"-doc-title", "Delta Lake " + version.value.replaceAll("-SNAPSHOT", "") + " ScalaDoc"
)

// Configure Java unidoc
javacOptions in(JavaUnidoc, unidoc) := Seq(
"-public",
"-exclude", "org:com:io.delta.execution",
"-exclude", "org:com:io.delta.tables.execution",
"-windowtitle", "Delta Lake " + version.value.replaceAll("-SNAPSHOT", "") + " JavaDoc",
"-noqualifier", "java.lang",
"-tag", "return:X"
Expand All @@ -107,7 +107,7 @@ javacOptions in(JavaUnidoc, unidoc) := Seq(
def ignoreUndocumentedPackages(packages: Seq[Seq[java.io.File]]): Seq[Seq[java.io.File]] = {
packages
.map(_.filterNot(_.getName.contains("$")))
.map(_.filterNot(_.getCanonicalPath.contains("io/delta/execution")))
.map(_.filterNot(_.getCanonicalPath.contains("io/delta/tables/execution")))
.map(_.filterNot(_.getCanonicalPath.contains("spark")))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta
package io.delta.tables

import scala.collection.JavaConverters._
import scala.collection.Map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* limitations under the License.
*/

package io.delta
package io.delta.tables

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta._
import io.delta.execution._
import io.delta.tables.execution._
import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.InterfaceStability._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* limitations under the License.
*/

package io.delta.execution
package io.delta.tables.execution

import scala.collection.Map

import org.apache.spark.sql.delta.PreprocessTableUpdate
import org.apache.spark.sql.delta.{DeltaErrors, DeltaFullTable, DeltaHistoryManager, DeltaLog}
import org.apache.spark.sql.delta.commands.{DeleteCommand, VacuumCommand}
import org.apache.spark.sql.delta.util.AnalysisHelper
import io.delta.DeltaTable
import io.delta.tables.DeltaTable

import org.apache.spark.sql.{functions, Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta;
package io.delta.tables;

import java.util.Arrays;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import scala.Tuple2;

import io.delta.DeltaTable;
import io.delta.tables.DeltaTable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import scala.Tuple2;

import io.delta.DeltaTable;
import io.delta.tables.DeltaTable;

import org.apache.spark.sql.*;
import org.apache.spark.util.Utils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import scala.Tuple2;

import io.delta.DeltaTable;
import io.delta.tables.DeltaTable;
import org.junit.*;

import org.apache.spark.sql.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta
package io.delta.tables

import java.util.Locale

Expand Down
14 changes: 7 additions & 7 deletions src/test/scala/org/apache/spark/sql/delta/DeleteScalaSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.delta

import io.delta.DeltaTable
import io.delta.tables.DeltaTable

import org.apache.spark.sql.{functions, Row}

Expand All @@ -26,21 +26,21 @@ class DeleteScalaSuite extends DeleteSuiteBase {

test("delete usage test - without condition") {
append(Seq((1, 10), (2, 20), (3, 30), (4, 40)).toDF("key", "value"))
val table = io.delta.DeltaTable.forPath(tempPath)
val table = io.delta.tables.DeltaTable.forPath(tempPath)
table.delete()
checkAnswer(readDeltaTable(tempPath), Nil)
}

test("delete usage test - with condition") {
append(Seq((1, 10), (2, 20), (3, 30), (4, 40)).toDF("key", "value"))
val table = io.delta.DeltaTable.forPath(tempPath)
val table = io.delta.tables.DeltaTable.forPath(tempPath)
table.delete("key = 1 or key = 2")
checkAnswer(readDeltaTable(tempPath), Row(3, 30) :: Row(4, 40) :: Nil)
}

test("delete usage test - with Column condition") {
append(Seq((1, 10), (2, 20), (3, 30), (4, 40)).toDF("key", "value"))
val table = io.delta.DeltaTable.forPath(tempPath)
val table = io.delta.tables.DeltaTable.forPath(tempPath)
table.delete(functions.expr("key = 1 or key = 2"))
checkAnswer(readDeltaTable(tempPath), Row(3, 30) :: Row(4, 40) :: Nil)
}
Expand All @@ -62,14 +62,14 @@ class DeleteScalaSuite extends DeleteSuiteBase {
}
}

val deltaTable: io.delta.DeltaTable = {
val deltaTable: DeltaTable = {
val (tableNameOrPath, optionalAlias) = parse(target)
val isPath: Boolean = tableNameOrPath.startsWith("delta.")
val table = if (isPath) {
val path = tableNameOrPath.stripPrefix("delta.`").stripSuffix("`")
io.delta.DeltaTable.forPath(spark, path)
io.delta.tables.DeltaTable.forPath(spark, path)
} else {
io.delta.DeltaTable(spark.table(tableNameOrPath))
io.delta.tables.DeltaTable(spark.table(tableNameOrPath))
}
optionalAlias.map(table.as(_)).getOrElse(table)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ trait DeltaVacuumSuiteBase extends QueryTest with SharedSQLContext with GivenWhe
checkDatasetUnorderly(result.as[String], qualified: _*)
case GCScalaApi(expectedDf, retention) =>
Given("*** Garbage collecting Reservoir using Scala")
val deltaTable = io.delta.DeltaTable.forPath(spark, deltaLog.dataPath.toString)
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, deltaLog.dataPath.toString)
val result = if (retention.isDefined) {
deltaTable.vacuum(retention.get)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ trait DescribeDeltaHistorySuiteBase

def getHistory(path: String, limit: Option[Int] = None): DataFrame = {
val deltaLog = DeltaLog.forTable(spark, path)
val deltaTable = io.delta.DeltaTable.forPath(spark, deltaLog.dataPath.toString)
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, deltaLog.dataPath.toString)
if (limit.isDefined) {
deltaTable.history(limit.get)
} else {
Expand Down Expand Up @@ -159,7 +159,7 @@ trait DescribeDeltaHistorySuiteBase
val tempDir = Utils.createTempDir().toString
Seq((1, "a"), (2, "3")).toDF("id", "data").write.format("delta").partitionBy("id").save(tempDir)
val deltaLog = DeltaLog.forTable(spark, tempDir)
val deltaTable = io.delta.DeltaTable.forPath(spark, deltaLog.dataPath.toString)
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, deltaLog.dataPath.toString)
deltaTable.delete("id = 1")

checkLastOperation(
Expand Down
36 changes: 18 additions & 18 deletions src/test/scala/org/apache/spark/sql/delta/MergeIntoScalaSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta

import java.util.Locale

import io.delta._
import io.delta.tables._

import org.apache.spark.sql._
import org.apache.spark.sql.types.StructType
Expand All @@ -33,7 +33,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {
append(Seq((1, 10), (2, 20)).toDF("key1", "value1"), Nil) // target
val source = Seq((1, 100), (3, 30)).toDF("key2", "value2") // source

io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, "key1 = key2")
.whenMatched().updateExpr(Map("key1" -> "key2", "value1" -> "value2"))
.whenNotMatched().insertExpr(Map("key1" -> "key2", "value1" -> "value2"))
Expand All @@ -53,7 +53,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {
append(Seq((1, 10), (2, 20), (4, 40)).toDF("key1", "value1"), Nil) // target
val source = Seq((1, 100), (3, 30), (4, 41)).toDF("key2", "value2") // source

io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, "key1 = key2")
.whenMatched("key1 = 4").delete()
.whenMatched("key2 = 1").updateExpr(Map("key1" -> "key2", "value1" -> "value2"))
Expand All @@ -74,7 +74,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {
append(Seq((1, 10), (2, 20), (4, 40)).toDF("key1", "value1"), Nil) // target
val source = Seq((1, 100), (3, 30), (4, 41)).toDF("key2", "value2") // source

io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, functions.expr("key1 = key2"))
.whenMatched(functions.expr("key1 = 4")).delete()
.whenMatched(functions.expr("key2 = 1"))
Expand Down Expand Up @@ -120,7 +120,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {
test("update with empty map should do nothing") {
append(Seq((1, 10), (2, 20)).toDF("trgKey", "trgValue"), Nil) // target
val source = Seq((1, 100), (3, 30)).toDF("srcKey", "srcValue") // source
io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, "srcKey = trgKey")
.whenMatched().updateExpr(Map[String, String]())
.whenNotMatched().insertExpr(Map("trgKey" -> "srcKey", "trgValue" -> "srcValue"))
Expand All @@ -134,7 +134,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {
Nil)

// match condition should not be ignored when map is empty
io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, "srcKey = trgKey")
.whenMatched("trgKey = 1").updateExpr(Map[String, String]())
.whenMatched().delete()
Expand All @@ -152,7 +152,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {
append(Seq((1, 10), (2, 20)).toDF("trgKey", "trgValue"), Nil) // target
val source = Seq((1, 100), (3, 30)).toDF("srcKey", "srcValue") // source
val e = intercept[AnalysisException] {
io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, "srcKey = trgKey")
.whenMatched().updateExpr(Map("trgKey" -> "srcKey", "trgValue" -> "srcValue"))
.whenNotMatched().insertExpr(Map[String, String]())
Expand All @@ -169,7 +169,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {

// There must be at least one WHEN clause in a MERGE statement
var e = intercept[AnalysisException] {
io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, "srcKey = trgKey")
.execute()
}
Expand All @@ -178,7 +178,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {
// When there are 2 MATCHED clauses in a MERGE statement,
// the first MATCHED clause must have a condition
e = intercept[AnalysisException] {
io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, "srcKey = trgKey")
.whenMatched().delete()
.whenMatched("trgKey = 1").updateExpr(Map("trgKey" -> "srcKey", "trgValue" -> "srcValue"))
Expand All @@ -189,7 +189,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {

// There must be at most two WHEN clauses in a MERGE statement
e = intercept[AnalysisException] {
io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, "srcKey = trgKey")
.whenMatched("trgKey = 1").updateExpr(Map("trgKey" -> "srcKey", "trgValue" -> "srcValue"))
.whenMatched("trgValue = 3").delete()
Expand All @@ -202,7 +202,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {

// INSERT can appear at most once in NOT MATCHED clauses in a MERGE statement
e = intercept[AnalysisException] {
io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, "srcKey = trgKey")
.whenNotMatched().insertExpr(Map("trgKey" -> "srcKey + 1", "trgValue" -> "srcValue"))
.whenNotMatched().insertExpr(Map("trgKey" -> "srcKey", "trgValue" -> "srcValue"))
Expand All @@ -213,7 +213,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {

// UPDATE can appear at most once in MATCHED clauses in a MERGE statement
e = intercept[AnalysisException] {
io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, "srcKey = trgKey")
.whenMatched("trgKey = 1").updateExpr(Map("trgKey" -> "srcKey", "trgValue" -> "srcValue"))
.whenMatched("trgValue = 2")
Expand All @@ -226,7 +226,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {

// DELETE can appear at most once in MATCHED clauses in a MERGE statement
e = intercept[AnalysisException] {
io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, "srcKey = trgKey")
.whenMatched("trgKey = 1").delete()
.whenMatched("trgValue = 2").delete()
Expand All @@ -237,7 +237,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {
"INSERT, UPDATE and DELETE cannot appear twice in one MERGE query")

e = intercept[AnalysisException] {
io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, "srcKey = trgKey")
.whenMatched().updateExpr(Map("trgKey" -> "srcKey", "*" -> "*"))
.whenNotMatched().insertExpr(Map("trgKey" -> "srcKey", "trgValue" -> "srcValue"))
Expand All @@ -246,7 +246,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {
errorContains(e.getMessage, "cannot resolve `*`")

e = intercept[AnalysisException] {
io.delta.DeltaTable.forPath(spark, tempPath)
io.delta.tables.DeltaTable.forPath(spark, tempPath)
.merge(source, "srcKey = trgKey")
.whenMatched().updateExpr(Map("trgKey" -> "srcKey", "trgValue" -> "srcValue"))
.whenNotMatched().insertExpr(Map("*" -> "*"))
Expand Down Expand Up @@ -394,13 +394,13 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {
}
}

private def makeDeltaTable(nameOrPath: String): io.delta.DeltaTable = {
private def makeDeltaTable(nameOrPath: String): DeltaTable = {
val isPath: Boolean = nameOrPath.startsWith("delta.")
if (isPath) {
val path = nameOrPath.stripPrefix("delta.`").stripSuffix("`")
io.delta.DeltaTable.forPath(spark, path)
io.delta.tables.DeltaTable.forPath(spark, path)
} else {
io.delta.DeltaTable(spark.table(nameOrPath))
io.delta.tables.DeltaTable(spark.table(nameOrPath))
}
}

Expand Down
Loading

0 comments on commit e75c8d1

Please sign in to comment.