Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sources report their partitioning to Spark #176

Draft
wants to merge 20 commits into
base: spark-3.2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f57b868
Report partitioning to Spark to avoid shuffles
EnricoMi Mar 22, 2022
85f90e0
Completed TripleSource tests for reporting partitioning
EnricoMi Mar 23, 2022
76efb8e
Do not call into planInputPartitions(), use cached / lazy result
EnricoMi Mar 23, 2022
3a1012c
Account for adaptive Spark plans in tests
EnricoMi Mar 23, 2022
1a408b8
Rework typing in TestEdgeSource
EnricoMi Mar 23, 2022
d7ac635
Move ShuffleExchange tests into trait
EnricoMi Mar 23, 2022
2703e51
Test EdgeSource is reporting partitioning
EnricoMi Mar 23, 2022
789466c
Test NodeSource is reporting partitioning for typed nodes
EnricoMi Mar 23, 2022
8de60e1
Test NodeSource is reporting partitioning for wide nodes
EnricoMi Mar 23, 2022
005c239
Use s string interpolation rather than f
EnricoMi Mar 23, 2022
29a71d0
Disallow any predicate partitioner for wide node source
EnricoMi Mar 23, 2022
13e4fad
Make SingletonPartitioner be a PredicatePartitioner with all predicat…
EnricoMi Mar 23, 2022
39252ca
Add missing scala file, add reporting to CHANGELOG.md
EnricoMi Mar 23, 2022
d20752d
Fix compile error
EnricoMi Mar 23, 2022
d973a0a
Move AdaptiveSparkPlanExec handling into containsShuffleExchangeExec
EnricoMi Mar 24, 2022
7a6ad39
Rethinking satisfying ClusterDistribution
EnricoMi Mar 24, 2022
c364eb4
Finish TestTripleScan
EnricoMi Mar 25, 2022
71fefc3
The relaxed satisfy logic makes it impossible to test no S+P partitio…
EnricoMi Mar 27, 2022
107a757
Add partitioning reporting to features in README.md
EnricoMi Apr 5, 2022
9771957
Fix tests after rebasing with master
EnricoMi Mar 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [UNRELEASED] - YYYY-MM-DD

### Added
- All sources report their partitioning to Spark so that Spark can exploit that partitioning.

### Changed
- Node source in wide mode does not allow for any predicate partitioning.
- SingletonPartitioner now provides all features (filter and projection pushdown)
of predicate partitioning, but as a single partition.

## [0.9.0] - 2022-07-14

### Changed
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ The connector provides the following features:
and timing of the communication to the Dgraph cluster.
- **Graph Partitioning and Streaming**: Graph data are partitioned and streamed in small chunks from Dgraph into Spark.
This guarantees that graphs of any size can be read into Spark.
- **Graph Partitioning available to Spark**: The actual partitioning of the graph data is known to Spark. This allows
Spark's query planning to fully make use of the existing data organization to avoid redundant data shuffling.

## Limitations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,34 @@

package uk.co.gresearch.spark.dgraph.connector

import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan}
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.connector.read.partitioning.{ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.types.StructType
import uk.co.gresearch.spark.dgraph.connector.model.GraphTableModel
import uk.co.gresearch.spark.dgraph.connector.partitioner.Partitioner

case class TripleScan(partitioner: Partitioner, model: GraphTableModel)
extends Scan
extends Scan with SupportsReportPartitioning
with Batch {

override def readSchema(): StructType = model.readSchema()

override def toBatch: Batch = this

override def planInputPartitions(): Array[InputPartition] = partitioner.getPartitions.toArray
private lazy val partitions: Array[InputPartition] = partitioner.getPartitions.toArray

override def planInputPartitions(): Array[InputPartition] = partitions

override def createReaderFactory(): PartitionReaderFactory =
TriplePartitionReaderFactory(model.withMetrics(AccumulatorPartitionMetrics()))

override def outputPartitioning(): Partitioning = new Partitioning {
def numPartitions: Int = partitions.length

def satisfy(distribution: Distribution): Boolean = distribution match {
case c: ClusteredDistribution =>
partitioner.getPartitionColumns.exists(_.forall(c.clusteredColumns.contains))
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ package object connector {
val AlphaPartitionerOption: String = "alpha"
val PredicatePartitionerOption: String = "predicate"
val UidRangePartitionerOption: String = "uid-range"
val PartitionerDefault: String = s"$PredicatePartitionerOption+$UidRangePartitionerOption"
val PredicateAndUidRangePartitionerOption: String = s"$PredicatePartitionerOption+$UidRangePartitionerOption"
val PartitionerDefault: String = PredicateAndUidRangePartitionerOption

val AlphaPartitionerPartitionsOption: String = "dgraph.partitioner.alpha.partitionsPerAlpha"
val AlphaPartitionerPartitionsDefault: Int = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@

package uk.co.gresearch.spark.dgraph.connector.partitioner

import uk.co.gresearch.spark.dgraph.connector.{ClusterState, Partition, Schema}
import uk.co.gresearch.spark.dgraph.connector.{AlphaPartitionerOption, ClusterState, Partition, Schema}

case class AlphaPartitioner(schema: Schema, clusterState: ClusterState, partitionsPerAlpha: Int)
extends Partitioner with ClusterStateHelper {

if (partitionsPerAlpha <= 0)
throw new IllegalArgumentException(s"partitionsPerAlpha must be larger than zero: $partitionsPerAlpha")

override def configOption: String = AlphaPartitionerOption

override def getPartitions: Seq[Partition] = {
PredicatePartitioner.getPartitions(
schema, clusterState, (group, _) => getGroupTargets(clusterState, group).size * partitionsPerAlpha, Set.empty, None
)
}

override def getPartitionColumns: Option[Seq[String]] = Some(Seq("predicate"))

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ConfigPartitionerOption extends PartitionerProviderOption
transaction: Option[Transaction],
options: CaseInsensitiveStringMap): Partitioner =
partitionerName match {
case SingletonPartitionerOption => SingletonPartitioner(getAllClusterTargets(clusterState), schema)
case SingletonPartitionerOption => SingletonPartitioner(schema, clusterState)
case GroupPartitionerOption => GroupPartitioner(schema, clusterState)
case AlphaPartitionerOption =>
AlphaPartitioner(schema, clusterState,
Expand All @@ -48,14 +48,13 @@ class ConfigPartitionerOption extends PartitionerProviderOption
case UidRangePartitionerOption =>
val uidsPerPartition = getIntOption(UidRangePartitionerUidsPerPartOption, options, UidRangePartitionerUidsPerPartDefault)
val estimator = getEstimatorOption(UidRangePartitionerEstimatorOption, options, UidRangePartitionerEstimatorDefault, clusterState)
val targets = getAllClusterTargets(clusterState)
val singleton = SingletonPartitioner(targets, schema)
UidRangePartitioner(singleton, uidsPerPartition, estimator)
case option if option.endsWith(s"+${UidRangePartitionerOption}") =>
val singleton = getPartitioner(SingletonPartitionerOption, schema, clusterState, transaction, options)
UidRangePartitioner(singleton, uidsPerPartition, estimator, innerPartitionerIsDefault = true)
case option if option.endsWith(s"+$UidRangePartitionerOption") =>
val name = option.substring(0, option.indexOf('+'))
val partitioner = getPartitioner(name, schema, clusterState, transaction, options)
getPartitioner(UidRangePartitionerOption, schema, clusterState, transaction, options)
.asInstanceOf[UidRangePartitioner].copy(partitioner = partitioner)
.asInstanceOf[UidRangePartitioner].copy(partitioner = partitioner, innerPartitionerIsDefault = false)
case unknown => throw new IllegalArgumentException(s"Unknown partitioner: $unknown")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@

package uk.co.gresearch.spark.dgraph.connector.partitioner

import org.apache.spark.sql.util.CaseInsensitiveStringMap
import uk.co.gresearch.spark.dgraph.connector._

class DefaultPartitionerOption extends ConfigPartitionerOption {

override def getPartitioner(schema: Schema,
clusterState: ClusterState,
transaction: Option[Transaction],
options: CaseInsensitiveStringMap): Option[Partitioner] =
Some(getPartitioner(PartitionerDefault, schema, clusterState, transaction, options))

}
class DefaultPartitionerOption extends PartitionerOption(PartitionerDefault)
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@

package uk.co.gresearch.spark.dgraph.connector.partitioner

import uk.co.gresearch.spark.dgraph.connector.{ClusterState, Partition, Schema}
import uk.co.gresearch.spark.dgraph.connector.{ClusterState, GroupPartitionerOption, Partition, Schema}

case class GroupPartitioner(schema: Schema, clusterState: ClusterState)
extends Partitioner with ClusterStateHelper {

override def configOption: String = GroupPartitionerOption

override def getPartitions: Seq[Partition] =
clusterState.groupMembers.map { case (group, alphas) =>
(group, alphas, getGroupPredicates(clusterState, group, schema))
}.filter(_._3.nonEmpty).map { case (_, alphas, predicates) =>
val langs = predicates.filter(_.isLang).map(_.predicateName)
Partition(alphas.toSeq).has(predicates).langs(langs)
}.toSeq

override def getPartitionColumns: Option[Seq[String]] = Some(Seq("predicate"))

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,27 @@
package uk.co.gresearch.spark.dgraph.connector.partitioner

import uk.co.gresearch.spark.dgraph.connector
import uk.co.gresearch.spark.dgraph.connector.{Filters, Partition, PartitionMetrics, Predicate}
import uk.co.gresearch.spark.dgraph.connector.{Filters, Partition, Predicate}

trait Partitioner {

/**
* The option string used for config "dgraph.partitioner" (PartitionerOption) to reference this partitioner.
* @return config option string
*/
def configOption: String

/**
* Gets the partitions.
*/
def getPartitions: Seq[Partition]

/**
* Get the column names that represent the partitioning.
* @return
*/
def getPartitionColumns: Option[Seq[String]]

/**
* Indicates whether this partitioner supports all given filters.
* @param filters filters
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package uk.co.gresearch.spark.dgraph.connector.partitioner

import org.apache.spark.sql.util.CaseInsensitiveStringMap
import uk.co.gresearch.spark.dgraph.connector.{ClusterState, Schema, Transaction}

class PartitionerOption(partitionerOption: String) extends ConfigPartitionerOption {

override def getPartitioner(schema: Schema,
clusterState: ClusterState,
transaction: Option[Transaction],
options: CaseInsensitiveStringMap): Option[Partitioner] =
Some(getPartitioner(partitionerOption, schema, clusterState, transaction, options))

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@
package uk.co.gresearch.spark.dgraph.connector.partitioner

import org.apache.spark.sql.util.CaseInsensitiveStringMap
import uk.co.gresearch.spark.dgraph.connector.partitioner.PartitionerProvider.{configPartitionProvider, defaultPartitionProvider}
import uk.co.gresearch.spark.dgraph.connector.{ClusterState, Schema, Transaction}

trait PartitionerProvider {

val partitionerOptions = Seq(
new ConfigPartitionerOption(),
new DefaultPartitionerOption()
)

def getPartitioner(schema: Schema,
clusterState: ClusterState,
transaction: Option[Transaction],
options: CaseInsensitiveStringMap): Partitioner =
partitionerOptions
options: CaseInsensitiveStringMap,
defaultPartitionProvider: PartitionerProviderOption = defaultPartitionProvider): Partitioner =
Seq(configPartitionProvider, defaultPartitionProvider)
.flatMap(_.getPartitioner(schema, clusterState, transaction, options))
.headOption
.getOrElse(throw new RuntimeException("Could not find any suitable partitioner"))

}

object PartitionerProvider {
val defaultPartitionProvider: PartitionerProviderOption = new DefaultPartitionerOption()
val configPartitionProvider: PartitionerProviderOption = new ConfigPartitionerOption()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package uk.co.gresearch.spark.dgraph.connector.partitioner

import java.math.BigInteger
import java.security.MessageDigest

import uk.co.gresearch.spark.dgraph.connector
import uk.co.gresearch.spark.dgraph.connector._

import java.math.BigInteger
import java.security.MessageDigest
import scala.language.implicitConversions

case class PredicatePartitioner(schema: Schema,
Expand All @@ -34,6 +33,8 @@ case class PredicatePartitioner(schema: Schema,
if (predicatesPerPartition <= 0)
throw new IllegalArgumentException(s"predicatesPerPartition must be larger than zero: $predicatesPerPartition")

override def configOption: String = PredicatePartitionerOption

def getPartitionsForPredicates(predicates: Set[_]): Int =
if (predicates.isEmpty) 1 else 1 + (predicates.size - 1) / predicatesPerPartition

Expand Down Expand Up @@ -72,6 +73,8 @@ case class PredicatePartitioner(schema: Schema,
PredicatePartitioner.getPartitions(schema, cState, (_, predicates) => getPartitionsForPredicates(predicates), simplifiedFilters, projection)
}

override def getPartitionColumns: Option[Seq[String]] = Some(Seq("predicate"))

/**
* Replaces ObjectTypeIsIn filter in required and optional filters
* using replaceObjectTypeIsInFilter(filters: Seq[Filter]).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,31 @@

package uk.co.gresearch.spark.dgraph.connector.partitioner

import uk.co.gresearch.spark.dgraph.connector.{Partition, Schema, Target}
import uk.co.gresearch.spark.dgraph.connector.{ClusterState, EmptyFilters, Filters, Predicate, Schema, SingletonPartitionerOption}

case class SingletonPartitioner(targets: Seq[Target], schema: Schema) extends Partitioner {
/**
* This partitioner produces a single partition, but provides the features
* of the PredicatePartitioner (filtering and projection, though for a single partition).
*/
class SingletonPartitioner(override val schema: Schema,
override val clusterState: ClusterState,
override val filters: Filters = EmptyFilters,
override val projection: Option[Seq[Predicate]] = None)
extends PredicatePartitioner(schema, clusterState, Int.MaxValue, filters, projection) {

override def configOption: String = SingletonPartitionerOption

override def getPartitionColumns: Option[Seq[String]] = None

override def getPartitions: Seq[Partition] = {
val langs = schema.predicates.filter(_.isLang).map(_.predicateName)
Seq(Partition(targets).has(schema.predicates).langs(langs))
}
override def withFilters(filters: Filters): Partitioner =
new SingletonPartitioner(schema, clusterState, filters, projection)

override def withProjection(projection: Seq[Predicate]): Partitioner =
new SingletonPartitioner(schema, clusterState, filters, Some(projection))

}

object SingletonPartitioner {
def apply(schema: Schema, clusterState: ClusterState): SingletonPartitioner =
new SingletonPartitioner(schema, clusterState)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
package uk.co.gresearch.spark.dgraph.connector.partitioner

import uk.co.gresearch.spark.dgraph.connector
import uk.co.gresearch.spark.dgraph.connector.{Filter, Filters, Partition, Uid, UidRange}
import uk.co.gresearch.spark.dgraph.connector.{Filter, Filters, Partition, Uid, UidRange, UidRangePartitionerOption}

case class UidRangePartitioner(partitioner: Partitioner, uidsPerPartition: Int, uidCardinalityEstimator: UidCardinalityEstimator) extends Partitioner {
case class UidRangePartitioner(partitioner: Partitioner,
uidsPerPartition: Int,
uidCardinalityEstimator: UidCardinalityEstimator,
innerPartitionerIsDefault: Boolean = false)
extends Partitioner {

if (partitioner == null)
throw new IllegalArgumentException("partitioner must not be null")
Expand All @@ -33,6 +37,12 @@ case class UidRangePartitioner(partitioner: Partitioner, uidsPerPartition: Int,
throw new IllegalArgumentException(s"UidRangePartitioner cannot be combined with " +
s"another uid partitioner: ${partitioner.getClass.getSimpleName}")

override def configOption: String = if (innerPartitionerIsDefault) {
UidRangePartitionerOption
} else {
s"${partitioner.configOption}+$UidRangePartitionerOption"
}

override def supportsFilters(filters: Set[Filter]): Boolean = partitioner.supportsFilters(filters)

override def withFilters(filters: Filters): UidRangePartitioner = copy(partitioner = partitioner.withFilters(filters))
Expand Down Expand Up @@ -66,4 +76,8 @@ case class UidRangePartitioner(partitioner: Partitioner, uidsPerPartition: Int,
}
}

override def getPartitionColumns: Option[Seq[String]] =
partitioner.getPartitionColumns.map(_ :+ "subject")
.orElse(Some(Seq("subject")))

}
Loading