Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Partition Assignment V2. #1917

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- name: Run tests
run: .github/workflows/runtests.sh
- name: Coverage Reports
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: scoverage-report
path: target/scala-*/scoverage-report/**
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,34 @@ import filodb.query.LogicalPlan._
import filodb.query.exec._

//scalastyle:off file.size.limit
case class PartitionDetails(partitionName: String, httpEndPoint: String,
grpcEndPoint: Option[String], proportion: Float)
trait PartitionAssignmentTrait {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: maybe we can add a Scala doc for the trait to just outline the use of proportionMap, timerange and some examples ?

val proportionMap: Map[String, PartitionDetails]
val timeRange: TimeRange
}

case class PartitionAssignment(partitionName: String, httpEndPoint: String, timeRange: TimeRange,
grpcEndPoint: Option[String] = None)
grpcEndPoint: Option[String] = None) extends PartitionAssignmentTrait {
val proportionMap: Map[String, PartitionDetails] =
Map(partitionName -> PartitionDetails(partitionName, httpEndPoint, grpcEndPoint, 1.0f))
}
case class PartitionAssignmentV2(proportionMap: Map[String, PartitionDetails],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add an assertion in PartitionAssignmentV2 such that all the proportion in proportionMap will sum up to 1.0f ?

timeRange: TimeRange) extends PartitionAssignmentTrait

trait PartitionLocationProvider {

// keep this function for backward compatibility.
def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment]
def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter],
timeRange: TimeRange): List[PartitionAssignment]
def getPartitionsTrait(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignmentTrait] = {
getPartitions(routingKey, timeRange)
}
def getMetadataPartitionsTrait(nonMetricShardKeyFilters: Seq[ColumnFilter],
timeRange: TimeRange): List[PartitionAssignmentTrait] = {
getMetadataPartitions(nonMetricShardKeyFilters, timeRange)
}
}

/**
Expand Down Expand Up @@ -166,9 +185,9 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val partitions = getPartitions(logicalPlan, paramToCheckPartitions)
if (isSinglePartition(partitions)) {
val (partitionName, startMs, endMs, grpcEndpoint) = partitions.headOption match {
case Some(pa: PartitionAssignment)
=> (pa.partitionName, params.startSecs * 1000L,
params.endSecs * 1000L, pa.grpcEndPoint)
case Some(pa: PartitionAssignmentTrait)
=> (pa.proportionMap.keys.head, params.startSecs * 1000L,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: should we add a simple helper function in PartitionAssignmentTrait to get the keys and values of proportionMap ?

params.endSecs * 1000L, pa.proportionMap.values.head.grpcEndPoint)
case None => (localPartitionName, params.startSecs * 1000L, params.endSecs * 1000L, None)
}

Expand Down Expand Up @@ -198,7 +217,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, remoteContext, inProcessPlanDispatcher,
dataset.ref, plannerSelector)
} else {
val remotePartitionEndpoint = partitions.head.httpEndPoint
val remotePartitionEndpoint = partitions.head.proportionMap.values.head.httpEndPoint
val httpEndpoint = remotePartitionEndpoint + params.remoteQueryPath.getOrElse("")
PromQlRemoteExec(httpEndpoint, remoteHttpTimeoutMs, remoteContext, inProcessPlanDispatcher,
dataset.ref, remoteExecHttpClient)
Expand Down Expand Up @@ -330,7 +349,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val partitions = if (routingKeys.isEmpty) List.empty
else {
routingKeys.flatMap{ keys =>
partitionLocationProvider.getPartitions(keys, queryTimeRange).
partitionLocationProvider.getPartitionsTrait(keys, queryTimeRange).
sortBy(_.timeRange.startMs)
}.toList
}
Expand Down Expand Up @@ -388,24 +407,46 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
PlanResult(execPlan:: Nil)
}

private def materializeForPartition(logicalPlan: LogicalPlan,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: please add a Scala doc

partition: PartitionAssignmentTrait,
queryContext: QueryContext,
timeRangeOverride: Option[TimeRange] = None): ExecPlan = {
partition match {
case PartitionAssignment(partitionName, httpEndPoint, _, grpcEndPoint) =>
materializeForPartition(logicalPlan, partitionName, grpcEndPoint, httpEndPoint, queryContext, timeRangeOverride)
case PartitionAssignmentV2(proportionMap, _) =>
val plans = proportionMap.map(entry => {
val partitionDetails = entry._2
materializeForPartition(logicalPlan, partitionDetails.partitionName,
partitionDetails.grpcEndPoint, partitionDetails.httpEndPoint, queryContext, timeRangeOverride)
}).toSeq
if (plans.size > 1) {
val dispatcher = PlannerUtil.pickDispatcher(plans)
MultiPartitionDistConcatExec(queryContext, dispatcher, plans)
} else {
plans.head
}
}
}
/**
* If the argument partition is local, materialize the LogicalPlan with the local planner.
* Otherwise, create a PromQlRemoteExec.
* @param timeRangeOverride: if given, the plan will be materialized to this range. Otherwise, the
* range is computed from the PromQlQueryParams.
*/
private def materializeForPartition(logicalPlan: LogicalPlan,
partition: PartitionAssignment,
partitionName: String,
grpcEndpoint: Option[String],
httpEndPoint: String,
queryContext: QueryContext,
timeRangeOverride: Option[TimeRange] = None): ExecPlan = {
timeRangeOverride: Option[TimeRange]): ExecPlan = {
val qContextWithOverride = timeRangeOverride.map{ r =>
val oldParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val newParams = oldParams.copy(startSecs = r.startMs / 1000, endSecs = r.endMs / 1000)
queryContext.copy(origQueryParams = newParams)
}.getOrElse(queryContext)
val queryParams = qContextWithOverride.origQueryParams.asInstanceOf[PromQlQueryParams]
val timeRange = timeRangeOverride.getOrElse(TimeRange(1000 * queryParams.startSecs, 1000 * queryParams.endSecs))
val (partitionName, grpcEndpoint) = (partition.partitionName, partition.grpcEndPoint)
if (partitionName.equals(localPartitionName)) {
// FIXME: the below check is needed because subquery tests fail when their
// time-ranges are updated even with the original query params.
Expand All @@ -423,7 +464,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, ctx, inProcessPlanDispatcher,
dataset.ref, plannerSelector)
} else {
val httpEndpoint = partition.httpEndPoint + queryParams.remoteQueryPath.getOrElse("")
val httpEndpoint = httpEndPoint + queryParams.remoteQueryPath.getOrElse("")
PromQlRemoteExec(httpEndpoint, remoteHttpTimeoutMs,
ctx, inProcessPlanDispatcher, dataset.ref, remoteExecHttpClient)
}
Expand All @@ -442,9 +483,9 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
* @param stepMsOpt occupied iff the returned ranges should describe periodic steps
* (i.e. all range start times (except the first) should be snapped to a step)
*/
private def getAssignmentQueryRanges(assignments: Seq[PartitionAssignment], queryRange: TimeRange,
private def getAssignmentQueryRanges(assignments: Seq[PartitionAssignmentTrait], queryRange: TimeRange,
lookbackMs: Long = 0L, offsetMs: Seq[Long] = Seq(0L),
stepMsOpt: Option[Long] = None): Seq[(PartitionAssignment, TimeRange)] = {
stepMsOpt: Option[Long] = None): Seq[(PartitionAssignmentTrait, TimeRange)] = {
// Construct a sequence of Option[TimeRange]; the ith range is None iff the ith partition has no range to query.
// First partition doesn't need its start snapped to a periodic step, so deal with it separately.
val filteredAssignments = assignments
Expand Down Expand Up @@ -612,7 +653,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
} else {
// materialize a plan for each range/assignment pair
val (_, execPlans) = assignmentRanges.foldLeft(
(None: Option[(PartitionAssignment, TimeRange)], ListBuffer.empty[ExecPlan])) {
(None: Option[(PartitionAssignmentTrait, TimeRange)], ListBuffer.empty[ExecPlan])) {
case (acc, next) => acc match {
case (Some((_, prevTimeRange)), ep: ListBuffer[ExecPlan]) =>
val (currentAssignment, currentTimeRange) = next
Expand Down Expand Up @@ -810,7 +851,8 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
localPartitionPlanner.materialize(lp, qContext)
}
else {
val execPlans = partitions.map { p =>
val execPlans = partitions.flatMap(ps => ps.proportionMap.values.map(pd =>
PartitionAssignment(pd.partitionName, pd.httpEndPoint, ps.timeRange, pd.grpcEndPoint))).map { p =>
logger.debug(s"partitionInfo=$p; queryParams=$queryParams")
if (p.partitionName.equals(localPartitionName))
localPartitionPlanner.materialize(
Expand Down Expand Up @@ -855,7 +897,8 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
logger.warn(s"no partitions found for $lp; defaulting to local planner")
localPartitionPlanner.materialize(lp, qContext)
} else {
val execPlans = partitions.map { p =>
val execPlans = partitions.flatMap(ps => ps.proportionMap.values.map(pd =>
PartitionAssignment(pd.partitionName, pd.httpEndPoint, ps.timeRange, pd.grpcEndPoint))).map { p =>
logger.debug(s"partition=$p; plan=$lp")
if (p.partitionName.equals(localPartitionName))
localPartitionPlanner.materialize(lp, qContext)
Expand All @@ -874,9 +917,10 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
PlanResult(execPlan::Nil)
}

private def getMetadataPartitions(filters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = {
private def getMetadataPartitions(filters: Seq[ColumnFilter],
timeRange: TimeRange): List[PartitionAssignmentTrait] = {
val nonMetricShardKeyFilters = filters.filter(f => dataset.options.nonMetricShardColumns.contains(f.column))
partitionLocationProvider.getMetadataPartitions(nonMetricShardKeyFilters, timeRange)
partitionLocationProvider.getMetadataPartitionsTrait(nonMetricShardKeyFilters, timeRange)
}

private def createMetadataRemoteExec(qContext: QueryContext, partitionAssignment: PartitionAssignment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ abstract class PartitionLocationPlanner(dataset: Dataset,
*/
protected def getPartitions(logicalPlan: LogicalPlan,
queryParams: PromQlQueryParams,
infiniteTimeRange: Boolean = false) : Seq[PartitionAssignment] = {
infiniteTimeRange: Boolean = false) : Seq[PartitionAssignmentTrait] = {

//1. Get a Seq of all Leaf node filters
val leafFilters = LogicalPlan.getColumnFilterGroup(logicalPlan)
Expand Down Expand Up @@ -71,19 +71,19 @@ abstract class PartitionLocationPlanner(dataset: Dataset,

//4. Based on the map in 2 and time range in 5, get the partitions to query
routingKeyMap.flatMap(metricMap =>
partitionLocationProvider.getPartitions(metricMap, queryTimeRange))
partitionLocationProvider.getPartitionsTrait(metricMap, queryTimeRange))
}
// scalastyle:on method.length

/**
* Checks if all the PartitionAssignments belong to same partition
*/
protected def isSinglePartition(partitions: Seq[PartitionAssignment]) : Boolean = {
protected def isSinglePartition(partitions: Seq[PartitionAssignmentTrait]) : Boolean = {
if (partitions.isEmpty)
true
else {
val partName = partitions.head.partitionName
partitions.forall(_.partitionName.equals(partName))
val pSet = partitions.flatMap(p => p.proportionMap.keys)
pSet.forall(p => p.equals(pSet.head))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
val shardKeys = getShardKeys(logicalPlan)
val partitions = shardKeys
.flatMap(filters => getPartitions(logicalPlan.replaceFilters(filters), qParams))
.map(_.partitionName)
.flatMap(_.proportionMap.keys)
.distinct
// NOTE: don't use partitions.size < 2. When partitions == 0, generateExec will not
// materialize any plans because there are no partitions against which it should materialize.
Expand Down Expand Up @@ -240,7 +240,7 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
val newLogicalPlan = logicalPlan.replaceFilters(key)
val newQueryParams = queryParams.copy(promQl = LogicalPlanParser.convertToQuery(newLogicalPlan))
val partitions = getPartitions(newLogicalPlan, newQueryParams)
.map(_.partitionName)
.flatMap(_.proportionMap.keys)
.distinct
if (partitions.size > 1) {
partitionSplitKeys.append(key)
Expand Down
Loading