Skip to content

Commit

Permalink
Merge pull request #357 from websudos/release/streams
Browse files Browse the repository at this point in the history
Release/streams: Improving support for Reactive Streams.
  • Loading branch information
alexflav23 committed Nov 19, 2015
2 parents 9b34db2 + 246eb3a commit 093176a
Show file tree
Hide file tree
Showing 31 changed files with 332 additions and 144 deletions.
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,15 @@ This is a list of companies that have embraced phantom as part of their technolo
- [CreditSuisse](https://www.credit-suisse.com/global/en/)
- [ING](http://www.ing.com/en.htm)
- [Wincor Nixdorf](http://www.wincor-nixdorf.com/internet/site_EN/EN/Home/homepage_node.html)
- [Mobli](https://www.mobli.com/)
- [Pellucid Analytics](http://www.pellucid.com/)
- [Equens](http://www.equens.com/)
- [websudos](https://www.websudos.com/)
- [VictorOps](http://www.victorops.com/)
- [Socrata](http://www.socrata.com)
- [Sphonic](http://www.sphonic.com/)
- [Anomaly42](http://www.anomaly42.com/)

- [Tecsisa](http://www.tecsisa.com/en/)

<a id="contributors">Contributors</a>
=====================================
Expand All @@ -94,15 +95,17 @@ Phantom was developed at websudos as an in-house project. All Cassandra integrat
Scala/Cassandra users in the world rely on phantom.

* Flavian Alexandru ([@alexflav23](https://github.com/alexflav23)) - maintainer
* Viktor Taranenko ([@viktortnk](https://github.com/viktortnk))
* Benjamin Edwards ([@benjumanji](https://github.com/benjumanji)
* Jens Halm ([@jenshalm](https://github.com/jenshalm))
* Bartosz Jankiewicz ([@bjankie1](https://github.com/bjankie1))
* Benjamin Edwards ([@benjumanji](https://github.com/benjumanji)
* Eugene Zhulenev ([@ezhulenev](https://github.com/ezhulenev))
* Juan José Vázquez ([@juanjovazquez](https://github.com/juanjovazquez))
* Viktor Taranenko ([@viktortnk](https://github.com/viktortnk))
* Jens Halm ([@jenshalm](https://github.com/jenshalm))
* Stephen Samuel ([@sksamuel](https://github.com/sksamuel))
* Tomasz Perek ([@tperek](https://github.com/tperek))
* Evan Chan ([@evanfchan](https://github.com/evanfchan))


<a id="copyright">Copyright</a>
===============================
<a href="#table-of-contents">back to top</a>
Expand Down
14 changes: 13 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Changelog
<li><a href="#version-1.12.0">1.12.0 - 03.09.2015</a></li>
<li><a href="#version-1.13.0">1.13.0 - 02.11.2015</a></li>
<li><a href="#version-1.15.0">1.15.0 - 10.11.2015</a></li>
<li><a href="#version-1.16.0">1.16.0 - 19.11.2015</a></li>
</ul>


Expand Down Expand Up @@ -223,4 +224,15 @@ and `org.joda.time.DateTime`.
================================

- Added support for Reactive Streams in phantom, with a default mapper subscriber.
- Implemented a publisher for reactive streams based on Play Enumerators convertions.
- Implemented a publisher for reactive streams based on Play Enumerators convertions.

<a id="version-1.16.0">1.16.0</a>
================================

- Fixed buffer overflow issue in Reactive streams.
- Added tests for the reactive streams publisher.
- Added support for binding in INSERT JSON clauses.
- Forcing request builders in reactive streams to be `Batchable`.
- Allowing elements to be derived from `Throwable` by adding an `ErrorWrapper` to error propagation in actors.
- Adding support for binding `scala.Enumeration#Value` in prepared statements.
- Bumped `xsbt-web-plugin` to `2.0.4`.
10 changes: 10 additions & 0 deletions phantom-container-test/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
resolvers ++= Seq(
"Sonatype snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/",
"jgit-repo" at "http://download.eclipse.org/jgit/maven",
"Twitter Repo" at "http://maven.twttr.com/",
"sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/",
Resolver.bintrayRepo("websudos", "oss-releases"),
Resolver.url("scoverage-bintray", url("https://dl.bintray.com/sksamuel/sbt-plugins/"))(Resolver.ivyStylePatterns)
)

addSbtPlugin("com.earldouglas" % "xsbt-web-plugin" % "2.0.4")
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.websudos.phantom.server;

object JettyLauncher {
lazy val port = if (System.getenv("PORT") != null) System.getenv("PORT") else "8900"

private[this] val started = new AtomicBoolean(false)

def startEmbeddedJetty() {
if (started.compareAndSet(false, true)) {
val server = new Server(port.toInt)
val context = new WebAppContext()
context setContextPath "/"
context.setResourceBase("src/main/webapp")
context.setInitParameter(ScalatraListener.LifeCycleKey, "com.websudos.phantom.server.ScalatraBootstrap")
context.addEventListener(new ScalatraListener)
context.addServlet(classOf[DefaultServlet], "/")

server.setHandler(context)
server.start()
}
}
}
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,22 @@ import scala.concurrent.duration._
import scala.reflect.runtime.universe.Symbol
import scala.reflect.runtime.{currentMirror => cm, universe => ru}


/**
* Main representation of a Cassandra table.
* @tparam T Type of this table.
* @tparam R Type of record.
*/
abstract class CassandraTable[T <: CassandraTable[T, R], R] extends SelectTable[T, R] { self =>


private[phantom] def insertSchema()(implicit session: Session, keySpace: KeySpace): Unit = {
Await.ready(create.ifNotExists().future(), 3.seconds)
}

private[phantom] def self: T = this.asInstanceOf[T]

private[this] lazy val _columns: MutableArrayBuffer[AbstractColumn[_]] = new MutableArrayBuffer[AbstractColumn[_]]
protected[this] lazy val _columns: MutableArrayBuffer[AbstractColumn[_]] = new MutableArrayBuffer[AbstractColumn[_]]

private[this] lazy val _name: String = {
protected[this] lazy val _name: String = {
cm.reflect(this).symbol.name.toTypeName.decodedName.toString
}

Expand Down Expand Up @@ -215,7 +213,4 @@ abstract class CassandraTable[T <: CassandraTable[T, R], R] extends SelectTable[
}
}




private[phantom] case object Lock
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
package com.websudos.phantom.builder.ops

import java.util.Date
import java.util.{UUID, Date}

import com.websudos.phantom.CassandraTable
import com.websudos.phantom.builder.QueryBuilder
Expand All @@ -47,6 +47,20 @@ sealed class DateOfOperator extends Operator {
def apply[T <: CassandraTable[T, R], R](pf: TimeUUIDColumn[T, R]): OperatorClause.Condition = {
new OperatorClause.Condition(QueryBuilder.Select.dateOf(pf.name))
}

def apply(uuid: UUID): OperatorClause.Condition = {
new OperatorClause.Condition(QueryBuilder.Select.dateOf(uuid.toString))
}

def apply(op: OperatorClause.Condition): OperatorClause.Condition = {
new OperatorClause.Condition(QueryBuilder.Select.dateOf(op.qb.queryString))
}
}

sealed class NowOperator extends Operator {
def apply(): OperatorClause.Condition = {
new OperatorClause.Condition(QueryBuilder.Select.now())
}
}

sealed class MaxTimeUUID extends Operator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,29 @@ trait ExecutableStatement extends CassandraOperations {
*/
def parameters: Seq[Any] = Nil

protected[this] def flattenOpt(parameters: Seq[Any]): Seq[Any] = {
/**
* Cleans up the series of parameters passed to the bind query to match
* the codec registry collection that the Java Driver has internally.
*
* If the type of the object passed through to the driver doesn't match a known type for the specific Cassandra column
* type, then the driver will crash with an error.
*
* There are known associations of (Cassandra Column Type -> Java Type) that we need to provide for binding to work.
*
* @param parameters The sequence of parameters to bind.
* @return A clansed set of parameters.
*/
protected[this] def flattenOpt(parameters: Seq[Any]): Seq[AnyRef] = {
//noinspection ComparingUnrelatedTypes
parameters map {
case x if x.isInstanceOf[Some[_]] => x.asInstanceOf[Some[Any]].get
case x if x.isInstanceOf[List[_]] => x.asInstanceOf[List[Any]].asJava
case x if x.isInstanceOf[Set[_]] => x.asInstanceOf[Set[Any]].asJava
case x if x.isInstanceOf[Map[_, _]] => x.asInstanceOf[Map[Any, Any]].asJava
case x if x.isInstanceOf[DateTime] => x.asInstanceOf[DateTime].toDate
case x if x.isInstanceOf[Enumeration#Value] => x.asInstanceOf[Enumeration#Value].toString
case x => x
}
} map(_.asInstanceOf[AnyRef])
}

def baseStatement()(implicit session: Session): Statement = {
Expand All @@ -75,7 +89,7 @@ trait ExecutableStatement extends CassandraOperations {
case someParameters => {
Manager.logger.debug("Executing prepared statement " + qb.queryString + " with bind params " + parameters.mkString(", "))

val params = flattenOpt(parameters).map(_.asInstanceOf[AnyRef])
val params = flattenOpt(parameters)

session.prepare(qb.queryString).bind(params: _*)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class InsertQuery[
) extends ExecutableStatement with Batchable {

final def json(value: String): InsertJsonQuery[Table, Record, Status, PS] = {

new InsertJsonQuery(
table = table,
init = QueryBuilder.Insert.json(init, value),
Expand All @@ -68,6 +67,17 @@ class InsertQuery[
)
}

final def json(value: PrepareMark): InsertJsonQuery[Table, Record, Status, String :: PS] = {
new InsertJsonQuery(
table = table,
init = QueryBuilder.Insert.json(init, value.qb.queryString),
usingPart = usingPart,
lightweightPart = lightweightPart,
consistencyLevel = consistencyLevel,
parameters = parameters
)
}

final def value[RR](col: Table => AbstractColumn[RR], value: RR) : InsertQuery[Table, Record, Status, PS] = {
new InsertQuery(
table,
Expand Down Expand Up @@ -237,9 +247,24 @@ class InsertJsonQuery[
override val parameters: Seq[Any] = Seq.empty
) extends ExecutableStatement with Batchable {

override def qb: CQLQuery = {
override val qb: CQLQuery = {
(usingPart merge lightweightPart) build init
}

final def bind[V1 <: Product, VL1 <: HList, Reversed <: HList](v1: V1)(
implicit rev: Reverse.Aux[PS, Reversed],
gen: Generic.Aux[V1, VL1],
ev: VL1 =:= Reversed
) : InsertJsonQuery[Table, Record, Status, PS] = {
new InsertJsonQuery(
table,
init,
usingPart,
lightweightPart,
consistencyLevel = consistencyLevel,
parameters = v1.productIterator.toSeq
)
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ private[builder] class SelectQueryBuilder {
CQLQuery(CQLSyntax.Selection.DateOf).wrapn(column)
}

def now(): CQLQuery = {
CQLQuery("now()")
}

def maxTimeuuid(dateString: String): CQLQuery = {
CQLQuery(CQLSyntax.Selection.MaxTimeUUID).wrapn(dateString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@
*/
package com.websudos.phantom.column

import scala.util.{Failure, Success, Try}
import com.datastax.driver.core.Row
import com.websudos.phantom.CassandraTable

import scala.util.{Failure, Success, Try}


abstract class Column[Owner <: CassandraTable[Owner, Record], Record, T](val table: CassandraTable[Owner, Record]) extends AbstractColumn[T] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import com.websudos.phantom.builder.primitives.{DefaultPrimitives, Primitive}
import com.websudos.phantom.builder.query.{CQLQuery, CreateImplicits, SelectImplicits}
import com.websudos.phantom.builder.syntax.CQLSyntax


package object dsl extends ImplicitMechanism with CreateImplicits with DefaultPrimitives with SelectImplicits with Operators {

type CassandraTable[Owner <: CassandraTable[Owner, Record], Record] = com.websudos.phantom.CassandraTable[Owner, Record]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.websudos.phantom

import java.util.UUID

import com.websudos.phantom.dsl._
import net.liftweb.json.JsonAST.{JString, JValue}
import net.liftweb.json.{Formats, MappingException, Serializer, TypeInfo}

import scala.util.control.NonFatal

class DateTimeSerializer extends Serializer[DateTime] {
private val Class = classOf[UUID]

def deserialize(implicit format: Formats): PartialFunction[(TypeInfo, JValue), DateTime] = {
case (TypeInfo(Class, _), json) => json match {
case JString(value) => try {
DateTimeIsPrimitive.fromString(value)
} catch {
case NonFatal(err) => {
val exception = new MappingException(s"Couldn't extract an UUID from $value")
exception.initCause(err)
throw exception
}
}
case x => throw new MappingException("Can't convert " + x + " to UUID")
}
}

def serialize(implicit format: Formats): PartialFunction[Any, JValue] = {
case x: DateTime => JString(DateTimeIsPrimitive.asCql(x))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ trait Test {
case class CustomRecord(name: String, mp: Map[String, String])

trait TestTableNames extends CassandraTable[TestTableNames, CustomRecord] {
object record extends StringColumn(this) with PartitionKey[String]
object rec extends StringColumn(this) with PartitionKey[String]
object sampleLongTextColumnDefinition extends MapColumn[TestTableNames, CustomRecord, String, String](this)

override def fromRow(r: Row): CustomRecord = {
CustomRecord(record(r), sampleLongTextColumnDefinition(r))
CustomRecord(rec(r), sampleLongTextColumnDefinition(r))
}
}

Expand All @@ -75,7 +75,7 @@ class ClassNameExtraction extends FlatSpec with Matchers {


it should "correctly name objects inside record classes " in {
TestTableNames.record.name shouldEqual "record"
TestTableNames.rec.name shouldEqual "rec"
}

it should "correctly extract long object name definitions in nested record classes" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ package com.websudos.phantom.builder.query.db.crud

import java.util.UUID

import com.websudos.phantom.DateTimeSerializer
import com.websudos.phantom.dsl._
import com.websudos.phantom.tables.{MyTest, MyTestRow, Primitive, Primitives, Recipe, Recipes, TestRow, TestTable}
import com.websudos.phantom.testkit._
Expand All @@ -44,30 +45,6 @@ import com.websudos.phantom.builder.primitives.DefaultPrimitives

import scala.util.control.NonFatal

sealed class DateTimeSerializer extends Serializer[DateTime] with DefaultPrimitives {
private val Class = classOf[UUID]

def deserialize(implicit format: Formats): PartialFunction[(TypeInfo, JValue), DateTime] = {
case (TypeInfo(Class, _), json) => json match {
case JString(value) => try {
DateTimeIsPrimitive.fromString(value)
} catch {
case NonFatal(err) => {
val exception = new MappingException(s"Couldn't extract an UUID from $value")
exception.initCause(err)
throw exception
}
}
case x => throw new MappingException("Can't convert " + x + " to UUID")
}
}

def serialize(implicit format: Formats): PartialFunction[Any, JValue] = {
case x: DateTime => JString(DateTimeIsPrimitive.asCql(x))
}
}


class InsertTest extends PhantomCassandraTestSuite {

implicit val s: PatienceConfiguration.Timeout = timeout(10 seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,10 @@ object ByteBufferTable extends ByteBufferTable with PhantomCassandraConnector {
def getById(id: UUID): Future[Option[BufferRecord]] = {
select.where(_.id eqs id).one()
}






}
Loading

0 comments on commit 093176a

Please sign in to comment.