From 951dcaa5da64070e20b899e20c9b733fdd70f450 Mon Sep 17 00:00:00 2001 From: Flavian Alexandru Date: Mon, 17 Apr 2017 11:59:53 +0100 Subject: [PATCH] Feature/controlling execution (#662) * Using instead of for Database members. * Fixing collection type. * Removing toSeq manual conversion * Changing the collection used to Seq. * More linting * Updating readmes with more informationg * Adding more docs * Moving stuff over * Adding batch documentation * Adding schema bug investigation. * Adding a databaseprovider. * Removing bad files * Fixing log4j * Adding a WARN statement * Adding optimised future semantics [version skip] --- .../phantom/builder/query/CreateQuery.scala | 30 +++----- .../builder/query/ExecutableQuery.scala | 75 +++++++++++++------ .../phantom/database/Database.scala | 4 +- .../outworkers/phantom/finagle/package.scala | 2 +- project/Publishing.scala | 2 +- 5 files changed, 70 insertions(+), 43 deletions(-) diff --git a/phantom-dsl/src/main/scala/com/outworkers/phantom/builder/query/CreateQuery.scala b/phantom-dsl/src/main/scala/com/outworkers/phantom/builder/query/CreateQuery.scala index 7878220dc..5f4a146a6 100644 --- a/phantom-dsl/src/main/scala/com/outworkers/phantom/builder/query/CreateQuery.scala +++ b/phantom-dsl/src/main/scala/com/outworkers/phantom/builder/query/CreateQuery.scala @@ -161,16 +161,14 @@ class CreateQuery[ override def qb: CQLQuery = (withClause merge WithPart.empty merge usingPart) build init - private[phantom] def indexList(name: String): ExecutableStatementList = { - new ExecutableStatementList(table.secondaryKeys map { - key => { - if (key.isMapKeyIndex) { - QueryBuilder.Create.mapIndex(table.tableName, name, key.name) - } else if (key.isMapEntryIndex) { - QueryBuilder.Create.mapEntries(table.tableName, name, key.name) - } else { - QueryBuilder.Create.index(table.tableName, name, key.name) - } + private[phantom] def indexList(name: String): ExecutableStatementList[Seq] = { + new ExecutableStatementList(table.secondaryKeys map { key => + if (key.isMapKeyIndex) { + QueryBuilder.Create.mapIndex(table.tableName, name, key.name) + } else if (key.isMapEntryIndex) { + QueryBuilder.Create.mapEntries(table.tableName, name, key.name) + } else { + QueryBuilder.Create.index(table.tableName, name, key.name) } }) } @@ -182,14 +180,10 @@ class CreateQuery[ if (table.secondaryKeys.isEmpty) { scalaQueryStringExecuteToFuture(new SimpleStatement(qb.terminate.queryString)) } else { - super.future() flatMap { - res => { - indexList(keySpace.name).future() map { - _ => { - Manager.logger.debug(s"Creating secondary indexes on ${QueryBuilder.keyspace(keySpace.name, table.tableName).queryString}") - res - } - } + super.future() flatMap { res => + indexList(keySpace.name).future() map { _ => + Manager.logger.debug(s"Creating secondary indexes on ${QueryBuilder.keyspace(keySpace.name, table.tableName).queryString}") + res } } } diff --git a/phantom-dsl/src/main/scala/com/outworkers/phantom/builder/query/ExecutableQuery.scala b/phantom-dsl/src/main/scala/com/outworkers/phantom/builder/query/ExecutableQuery.scala index d4a1a3261..bc09be2c7 100644 --- a/phantom-dsl/src/main/scala/com/outworkers/phantom/builder/query/ExecutableQuery.scala +++ b/phantom-dsl/src/main/scala/com/outworkers/phantom/builder/query/ExecutableQuery.scala @@ -104,35 +104,68 @@ trait ExecutableStatement extends CassandraOperations { } } -class ExecutableStatementList(val queries: Seq[CQLQuery]) extends CassandraOperations { +private[this] object SequentialFutures { + def sequencedTraverse[ + A, + B, + M[X] <: TraversableOnce[X] + ](in: M[A])(fn: A => ScalaFuture[B])(implicit + executor: ExecutionContextExecutor, + cbf: CanBuildFrom[M[A], B, M[B]] + ): ScalaFuture[M[B]] = { + in.foldLeft(ScalaFuture.successful(cbf(in))) { (fr, a) => + for (r <- fr; b <- fn(a)) yield r += b + }.map(_.result()) + } +} - /** - * Secondary constructor to allow passing in Sets instead of Sequences. - * Although this may appear to be fruitless and uninteresting it a necessary evil. - * - * The TwitterFuture.collect method does not support passing in arbitrary collections using the Scala API - * just as Scala.future does. Scala Futures can sequence over traversables and return a collection of the appropiate type. - * - * @param queries The list of CQL queries to execute. - * @return An instance of an ExecutableStatement with the matching sequence of CQL queries. - */ - def this(queries: Set[CQLQuery]) = this(queries.toSeq) +class ExecutableStatementList[ + M[X] <: TraversableOnce[X] +](val queries: M[CQLQuery])( + implicit cbf: CanBuildFrom[M[CQLQuery], CQLQuery, M[CQLQuery]] +) extends CassandraOperations { - def add(appendable: Seq[CQLQuery]): ExecutableStatementList = { - new ExecutableStatementList(queries ++ appendable) + def add(appendable: M[CQLQuery]): ExecutableStatementList[M] = { + val builder = cbf(queries) + for (q <- appendable) builder += q + new ExecutableStatementList(builder.result()) } - def ++(appendable: Seq[CQLQuery]): ExecutableStatementList = add(appendable) + def ++(appendable: M[CQLQuery]): ExecutableStatementList[M] = add(appendable) - def ++(st: ExecutableStatementList): ExecutableStatementList = add(st.queries) + def ++(st: ExecutableStatementList[M]): ExecutableStatementList[M] = add(st.queries) + + /** Transforms a `TraversableOnce[A]` into a `Future[TraversableOnce[B]]` using the provided function `A => Future[B]`. + * This is useful for performing a parallel map. For example, to apply a function to all items of a list + * in parallel: + * + * {{{ + * val myFutureList = Future.traverse(myList)(x => Future(myFunc(x))) + * }}} + */ def future()( implicit session: Session, - ec: ExecutionContextExecutor - ): ScalaFuture[Seq[ResultSet]] = { - ScalaFuture.sequence(queries.map(item => { - scalaQueryStringExecuteToFuture(new SimpleStatement(item.terminate.queryString)) - })) + ec: ExecutionContextExecutor, + fbf: CanBuildFrom[Nothing, ScalaFuture[ResultSet], M[ScalaFuture[ResultSet]]], + ebf: CanBuildFrom[M[ScalaFuture[ResultSet]], ResultSet, M[ResultSet]] + ): ScalaFuture[M[ResultSet]] = { + + val builder = fbf() + + for (q <- queries) builder += scalaQueryStringExecuteToFuture(new SimpleStatement(q.terminate.queryString)) + + ScalaFuture.sequence(builder.result())(ebf, ec) + } + + def sequentialFuture()( + implicit session: Session, + ec: ExecutionContextExecutor, + cbf: CanBuildFrom[M[CQLQuery], ResultSet, M[ResultSet]] + ): ScalaFuture[M[ResultSet]] = { + SequentialFutures.sequencedTraverse(queries) { + q => scalaQueryStringExecuteToFuture(new SimpleStatement(q.terminate.queryString)) + } } } diff --git a/phantom-dsl/src/main/scala/com/outworkers/phantom/database/Database.scala b/phantom-dsl/src/main/scala/com/outworkers/phantom/database/Database.scala index 38f48ffd9..559dead78 100644 --- a/phantom-dsl/src/main/scala/com/outworkers/phantom/database/Database.scala +++ b/phantom-dsl/src/main/scala/com/outworkers/phantom/database/Database.scala @@ -97,7 +97,7 @@ abstract class Database[ * @return An executable statement list that can be used with Scala or Twitter futures to simultaneously * execute an entire sequence of queries. */ - private[phantom] def autodrop(): ExecutableStatementList = { + private[phantom] def autodrop(): ExecutableStatementList[Seq] = { new ExecutableStatementList(tables.map { table => table.alter().drop().qb }) @@ -137,7 +137,7 @@ abstract class Database[ * @return An executable statement list that can be used with Scala or Twitter futures to simultaneously * execute an entire sequence of queries. */ - private[phantom] def autotruncate(): ExecutableStatementList = { + private[phantom] def autotruncate(): ExecutableStatementList[Seq] = { new ExecutableStatementList(tables.map(_.truncate().qb)) } diff --git a/phantom-finagle/src/main/scala/com/outworkers/phantom/finagle/package.scala b/phantom-finagle/src/main/scala/com/outworkers/phantom/finagle/package.scala index ea3295c73..afc326c40 100644 --- a/phantom-finagle/src/main/scala/com/outworkers/phantom/finagle/package.scala +++ b/phantom-finagle/src/main/scala/com/outworkers/phantom/finagle/package.scala @@ -275,7 +275,7 @@ package object finagle { } } - implicit class ExecutableStatementListAugmenter(val list: ExecutableStatementList) extends AnyVal { + implicit class ExecutableStatementListAugmenter(val list: ExecutableStatementList[Seq]) extends AnyVal { def execute()(implicit session: Session, executor: Executor): Future[Seq[ResultSet]] = { Future.collect(list.queries.map(item => { twitterQueryStringExecuteToFuture(new SimpleStatement(item.terminate.queryString)) diff --git a/project/Publishing.scala b/project/Publishing.scala index e62f8b4a0..6eef1e754 100644 --- a/project/Publishing.scala +++ b/project/Publishing.scala @@ -23,7 +23,7 @@ import scala.util.Properties object Publishing { val defaultPublishingSettings = Seq( - version := "2.6.5" + version := "2.7.0" ) lazy val noPublishSettings = Seq(