Skip to content

Commit

Permalink
Feature/controlling execution (#662)
Browse files Browse the repository at this point in the history
* Using  instead of  for Database members.

* Fixing collection type.

* Removing toSeq manual conversion

* Changing the collection used to Seq.

* More linting

* Updating readmes with more informationg

* Adding more docs

* Moving stuff over

* Adding batch documentation

* Adding schema bug investigation.

* Adding a databaseprovider.

* Removing bad files

* Fixing log4j

* Adding a WARN statement

* Adding optimised future semantics [version skip]
  • Loading branch information
alexflav23 authored Apr 17, 2017
1 parent 1a13f62 commit 951dcaa
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,14 @@ class CreateQuery[

override def qb: CQLQuery = (withClause merge WithPart.empty merge usingPart) build init

private[phantom] def indexList(name: String): ExecutableStatementList = {
new ExecutableStatementList(table.secondaryKeys map {
key => {
if (key.isMapKeyIndex) {
QueryBuilder.Create.mapIndex(table.tableName, name, key.name)
} else if (key.isMapEntryIndex) {
QueryBuilder.Create.mapEntries(table.tableName, name, key.name)
} else {
QueryBuilder.Create.index(table.tableName, name, key.name)
}
private[phantom] def indexList(name: String): ExecutableStatementList[Seq] = {
new ExecutableStatementList(table.secondaryKeys map { key =>
if (key.isMapKeyIndex) {
QueryBuilder.Create.mapIndex(table.tableName, name, key.name)
} else if (key.isMapEntryIndex) {
QueryBuilder.Create.mapEntries(table.tableName, name, key.name)
} else {
QueryBuilder.Create.index(table.tableName, name, key.name)
}
})
}
Expand All @@ -182,14 +180,10 @@ class CreateQuery[
if (table.secondaryKeys.isEmpty) {
scalaQueryStringExecuteToFuture(new SimpleStatement(qb.terminate.queryString))
} else {
super.future() flatMap {
res => {
indexList(keySpace.name).future() map {
_ => {
Manager.logger.debug(s"Creating secondary indexes on ${QueryBuilder.keyspace(keySpace.name, table.tableName).queryString}")
res
}
}
super.future() flatMap { res =>
indexList(keySpace.name).future() map { _ =>
Manager.logger.debug(s"Creating secondary indexes on ${QueryBuilder.keyspace(keySpace.name, table.tableName).queryString}")
res
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,35 +104,68 @@ trait ExecutableStatement extends CassandraOperations {
}
}

class ExecutableStatementList(val queries: Seq[CQLQuery]) extends CassandraOperations {
private[this] object SequentialFutures {
def sequencedTraverse[
A,
B,
M[X] <: TraversableOnce[X]
](in: M[A])(fn: A => ScalaFuture[B])(implicit
executor: ExecutionContextExecutor,
cbf: CanBuildFrom[M[A], B, M[B]]
): ScalaFuture[M[B]] = {
in.foldLeft(ScalaFuture.successful(cbf(in))) { (fr, a) =>
for (r <- fr; b <- fn(a)) yield r += b
}.map(_.result())
}
}

/**
* Secondary constructor to allow passing in Sets instead of Sequences.
* Although this may appear to be fruitless and uninteresting it a necessary evil.
*
* The TwitterFuture.collect method does not support passing in arbitrary collections using the Scala API
* just as Scala.future does. Scala Futures can sequence over traversables and return a collection of the appropiate type.
*
* @param queries The list of CQL queries to execute.
* @return An instance of an ExecutableStatement with the matching sequence of CQL queries.
*/
def this(queries: Set[CQLQuery]) = this(queries.toSeq)
class ExecutableStatementList[
M[X] <: TraversableOnce[X]
](val queries: M[CQLQuery])(
implicit cbf: CanBuildFrom[M[CQLQuery], CQLQuery, M[CQLQuery]]
) extends CassandraOperations {

def add(appendable: Seq[CQLQuery]): ExecutableStatementList = {
new ExecutableStatementList(queries ++ appendable)
def add(appendable: M[CQLQuery]): ExecutableStatementList[M] = {
val builder = cbf(queries)
for (q <- appendable) builder += q
new ExecutableStatementList(builder.result())
}

def ++(appendable: Seq[CQLQuery]): ExecutableStatementList = add(appendable)
def ++(appendable: M[CQLQuery]): ExecutableStatementList[M] = add(appendable)

def ++(st: ExecutableStatementList): ExecutableStatementList = add(st.queries)
def ++(st: ExecutableStatementList[M]): ExecutableStatementList[M] = add(st.queries)

/** Transforms a `TraversableOnce[A]` into a `Future[TraversableOnce[B]]` using the provided function `A => Future[B]`.
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel:
*
* {{{
* val myFutureList = Future.traverse(myList)(x => Future(myFunc(x)))
* }}}
*/

def future()(
implicit session: Session,
ec: ExecutionContextExecutor
): ScalaFuture[Seq[ResultSet]] = {
ScalaFuture.sequence(queries.map(item => {
scalaQueryStringExecuteToFuture(new SimpleStatement(item.terminate.queryString))
}))
ec: ExecutionContextExecutor,
fbf: CanBuildFrom[Nothing, ScalaFuture[ResultSet], M[ScalaFuture[ResultSet]]],
ebf: CanBuildFrom[M[ScalaFuture[ResultSet]], ResultSet, M[ResultSet]]
): ScalaFuture[M[ResultSet]] = {

val builder = fbf()

for (q <- queries) builder += scalaQueryStringExecuteToFuture(new SimpleStatement(q.terminate.queryString))

ScalaFuture.sequence(builder.result())(ebf, ec)
}

def sequentialFuture()(
implicit session: Session,
ec: ExecutionContextExecutor,
cbf: CanBuildFrom[M[CQLQuery], ResultSet, M[ResultSet]]
): ScalaFuture[M[ResultSet]] = {
SequentialFutures.sequencedTraverse(queries) {
q => scalaQueryStringExecuteToFuture(new SimpleStatement(q.terminate.queryString))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ abstract class Database[
* @return An executable statement list that can be used with Scala or Twitter futures to simultaneously
* execute an entire sequence of queries.
*/
private[phantom] def autodrop(): ExecutableStatementList = {
private[phantom] def autodrop(): ExecutableStatementList[Seq] = {
new ExecutableStatementList(tables.map {
table => table.alter().drop().qb
})
Expand Down Expand Up @@ -137,7 +137,7 @@ abstract class Database[
* @return An executable statement list that can be used with Scala or Twitter futures to simultaneously
* execute an entire sequence of queries.
*/
private[phantom] def autotruncate(): ExecutableStatementList = {
private[phantom] def autotruncate(): ExecutableStatementList[Seq] = {
new ExecutableStatementList(tables.map(_.truncate().qb))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ package object finagle {
}
}

implicit class ExecutableStatementListAugmenter(val list: ExecutableStatementList) extends AnyVal {
implicit class ExecutableStatementListAugmenter(val list: ExecutableStatementList[Seq]) extends AnyVal {
def execute()(implicit session: Session, executor: Executor): Future[Seq[ResultSet]] = {
Future.collect(list.queries.map(item => {
twitterQueryStringExecuteToFuture(new SimpleStatement(item.terminate.queryString))
Expand Down
2 changes: 1 addition & 1 deletion project/Publishing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.Properties
object Publishing {

val defaultPublishingSettings = Seq(
version := "2.6.5"
version := "2.7.0"
)

lazy val noPublishSettings = Seq(
Expand Down

0 comments on commit 951dcaa

Please sign in to comment.