Skip to content

Commit

Permalink
Merge branch 'release/2.3.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
seratch committed Jan 19, 2015
2 parents f41dcf3 + 19b4561 commit a3e1272
Show file tree
Hide file tree
Showing 49 changed files with 410 additions and 297 deletions.
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ jdk:
- oraclejdk8

scala:
- 2.11.2
- 2.11.4

addons:
postgresql: "9.3"
Expand All @@ -14,4 +14,6 @@ before_script:
- psql -c "CREATE USER octoparts_app WITH PASSWORD '';" -U postgres
- psql -c "GRANT ALL PRIVILEGES ON DATABASE octoparts_test to octoparts_app;" -U postgres

script: "sbt coveralls test"
script: "sbt clean coverage test"

after_success: "sbt coverageAggregate coveralls"
4 changes: 3 additions & 1 deletion app/com/m3/octoparts/aggregator/handler/Handler.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.m3.octoparts.aggregator.handler

import com.m3.octoparts.aggregator.PartRequestInfo
import com.m3.octoparts.model.PartResponse
import com.m3.octoparts.model.config.ShortPartParam

Expand All @@ -21,5 +22,6 @@ trait Handler {
// Used primarily for creating a PartResponse, but also for logging purposes
def partId: String

def process(arguments: HandlerArguments): Future[PartResponse]
def process(partRequestInfo: PartRequestInfo, arguments: HandlerArguments): Future[PartResponse]

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.m3.octoparts.aggregator.handler

import java.net.{ URI, URLEncoder }

import com.m3.octoparts.aggregator.PartRequestInfo
import com.m3.octoparts.http._
import com.m3.octoparts.hystrix._
import com.m3.octoparts.model.{ HttpMethod, PartResponse }
Expand Down Expand Up @@ -47,12 +48,13 @@ trait HttpPartRequestHandler extends Handler {
* contain a Failure instead of Success. Make sure to transform with
* .recover
*
* @param partRequestInfo info about the request, used for generating HTTP headers for request tracing
* @param hArgs Preparsed HystrixArguments
* @return Future[PartResponse]
*/
def process(hArgs: HandlerArguments): Future[PartResponse] = {
def process(partRequestInfo: PartRequestInfo, hArgs: HandlerArguments): Future[PartResponse] = {
hystrixExecutor.future {
createBlockingHttpRetrieve(hArgs).retrieve()
createBlockingHttpRetrieve(partRequestInfo, hArgs).retrieve()
}.map {
createPartResponse
}
Expand All @@ -64,18 +66,27 @@ trait HttpPartRequestHandler extends Handler {
* @param hArgs Handler arguments
* @return a command object that will perform an HTTP request on demand
*/
def createBlockingHttpRetrieve(hArgs: HandlerArguments): BlockingHttpRetrieve = {
def createBlockingHttpRetrieve(partRequestInfo: PartRequestInfo, hArgs: HandlerArguments): BlockingHttpRetrieve = {
new BlockingHttpRetrieve {
val httpClient = handler.httpClient
def method = httpMethod
val uri = new URI(buildUri(hArgs))
val maybeBody = hArgs.collectFirst {
case (p, values) if p.paramType == ParamType.Body && values.nonEmpty => values.head
}
val headers = collectHeaders(hArgs)
val headers = collectHeaders(hArgs) ++ buildTracingHeaders(partRequestInfo)
}
}

private def buildTracingHeaders(partRequestInfo: PartRequestInfo): Seq[(String, String)] = {
import HttpPartRequestHandler._
Seq(
AggregateRequestIdHeader -> partRequestInfo.requestMeta.id,
PartRequestIdHeader -> partRequestInfo.partRequestId,
PartIdHeader -> partRequestInfo.partRequest.partId
)
}

/**
* Transforms a HttpResponse case class into a PartResponse
* @param httpResp HttpResponse
Expand Down Expand Up @@ -141,7 +152,8 @@ trait HttpPartRequestHandler extends Handler {
maybeParamsVal.getOrElse("")
}
val kvs = for {
(p, values) <- hArgs if p.paramType == ParamType.Query
// toSeq because we don't want the result to be a map (with unique keys)
(p, values) <- hArgs.toSeq if p.paramType == ParamType.Query
v <- values
} yield p.outputName -> v
baseUri.addParams(kvs.toSeq)
Expand All @@ -157,4 +169,10 @@ trait HttpPartRequestHandler extends Handler {
private def interpolate(stringToInterpolate: String)(replacer: String => String) =
PlaceholderReplacer.replaceAllIn(stringToInterpolate, { m => replacer(m.group(1)) })

}

object HttpPartRequestHandler {
val AggregateRequestIdHeader = "X-OCTOPARTS-PARENT-REQUEST-ID"
val PartRequestIdHeader = "X-OCTOPARTS-REQUEST-ID"
val PartIdHeader = "X-OCTOPARTS-PART-ID"
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.m3.octoparts.aggregator.service

import com.beachape.logging.LTSVLogger
import com.m3.octoparts.aggregator.PartRequestInfo
import com.m3.octoparts.aggregator.handler.HttpHandlerFactory
import com.m3.octoparts.model.PartResponse
import com.m3.octoparts.model.config.{ HttpPartConfig, ShortPartParam }
import com.m3.octoparts.repository.ConfigsRepository
import skinny.logging.Logging
import skinny.util.LTSV

import scala.concurrent.{ ExecutionContext, Future }

Expand All @@ -16,7 +15,7 @@ import scala.concurrent.{ ExecutionContext, Future }
* Implements the basic shared methods but leaves some methods to be implemented in
* children classes / decorators
*/
trait PartRequestServiceBase extends RequestParamSupport with Logging {
trait PartRequestServiceBase extends RequestParamSupport {
implicit def executionContext: ExecutionContext

def repository: ConfigsRepository
Expand Down Expand Up @@ -50,7 +49,7 @@ trait PartRequestServiceBase extends RequestParamSupport with Logging {
*/
private def unsupported(pReq: PartRequestInfo): Future[PartResponse] = {
val partId = pReq.partRequest.partId
warn(LTSV.dump("Request Id" -> pReq.requestMeta.id, "Requested PartId" -> partId, "Error" -> "not found"))
LTSVLogger.warn("Request Id" -> pReq.requestMeta.id, "Requested PartId" -> partId, "Error" -> "not found")
Future.successful(PartResponse(partId, pReq.partRequestId, errors = Seq(unsupportedMsg(partId))))
}

Expand Down Expand Up @@ -82,7 +81,7 @@ trait PartRequestServiceBase extends RequestParamSupport with Logging {
*/
protected def processWithConfig(ci: HttpPartConfig, partRequestInfo: PartRequestInfo, params: Map[ShortPartParam, Seq[String]]): Future[PartResponse] = {
val handler = handlerFactory.makeHandler(ci)
val fResp = handler.process(params)
val fResp = handler.process(partRequestInfo, params)
fResp.map {
resp =>
val respWithId = resp.copy(id = partRequestInfo.partRequestId)
Expand Down
11 changes: 5 additions & 6 deletions app/com/m3/octoparts/cache/PartResponseCachingSupport.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.m3.octoparts.cache

import com.beachape.logging.LTSVLogger
import com.m3.octoparts.aggregator.PartRequestInfo
import com.m3.octoparts.aggregator.service.PartRequestServiceBase
import com.m3.octoparts.cache.directive.{ CacheDirective, CacheDirectiveGenerator }
import com.m3.octoparts.model.PartResponse
import com.m3.octoparts.model.config._
import org.apache.http.HttpStatus
import skinny.logging.Logging
import skinny.util.LTSV
import com.m3.octoparts.cache.RichCacheControl._

import scala.concurrent.Future
Expand All @@ -30,7 +29,7 @@ private[cache] object PartResponseCachingSupport {
}
}

trait PartResponseCachingSupport extends PartRequestServiceBase with Logging {
trait PartResponseCachingSupport extends PartRequestServiceBase {
import PartResponseCachingSupport._

def cacheOps: CacheOps
Expand Down Expand Up @@ -67,14 +66,14 @@ trait PartResponseCachingSupport extends PartRequestServiceBase with Logging {
case ce: CacheException => {
ce.getCause match {
case te: shade.TimeoutException =>
warn(LTSV.dump("Memcached error" -> "timed out", "cache key" -> ce.key.toString))
LTSVLogger.warn("Memcached error" -> "timed out", "cache key" -> ce.key.toString)
case other =>
error(LTSV.dump("Memcached error" -> other.getClass.getSimpleName, "cache key" -> ce.key.toString), other)
LTSVLogger.error(other, "Memcached error" -> other.getClass.getSimpleName, "cache key" -> ce.key.toString)
}
super.processWithConfig(ci, partRequestInfo, params)
}
case NonFatal(e) => {
error(LTSV.dump("Memcached error" -> e.getClass.getSimpleName), e)
LTSVLogger.error(e, "Memcached error" -> e.getClass.getSimpleName)
super.processWithConfig(ci, partRequestInfo, params)
}
}
Expand Down
7 changes: 2 additions & 5 deletions app/com/m3/octoparts/cache/memcached/MemcachedCacheOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@ import com.m3.octoparts.cache.key.{ PartCacheKey, VersionCacheKey }
import com.m3.octoparts.cache.versioning._
import com.m3.octoparts.model.PartResponse
import shade.memcached.Codec
import skinny.logging.Logging

import scala.concurrent.duration.Duration
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal

class MemcachedCacheOps(
cache: Cache,
latestVersionCache: LatestVersionCache)(implicit executionContext: ExecutionContext)
extends CacheOps
with TtlCalculator
with Logging {
with TtlCalculator {

import com.m3.octoparts.cache.versioning.LatestVersionCache.Version

Expand Down Expand Up @@ -48,7 +45,7 @@ class MemcachedCacheOps(
}

def insertPartResponse(cacheKey: PartCacheKey, partResponse: PartResponse, ttl: Option[Duration]): Future[Unit] =
cache.put[String](cacheKey, Json.toJson(partResponse).toString, calculateTtl(partResponse.cacheControl, ttl))
cache.put[String](cacheKey, Json.toJson(partResponse).toString(), calculateTtl(partResponse.cacheControl, ttl))
}

object CombinedVersionLookup {
Expand Down
18 changes: 1 addition & 17 deletions app/com/m3/octoparts/http/InstrumentedHttpClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import java.nio.charset.{ Charset, StandardCharsets }
import com.beachape.logging.LTSVLogger
import com.codahale.metrics.{ Gauge, MetricRegistry }
import com.m3.octoparts.OctopartsMetricsRegistry
import com.m3.octoparts.util.TimingSupport
import org.apache.http.{ HttpClientConnection, HttpRequest }
import org.apache.http.client.HttpClient
import org.apache.http.client.config.{ CookieSpecs, RequestConfig }
Expand All @@ -16,8 +15,6 @@ import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
import org.apache.http.pool.PoolStats
import org.apache.http.protocol.{ HttpContext, HttpRequestExecutor }
import skinny.logging.Logging
import skinny.util.LTSV

import scala.concurrent.duration._

Expand All @@ -37,8 +34,6 @@ class InstrumentedHttpClient(
socketTimeout: Duration = 10.seconds,
defaultEncoding: Charset = StandardCharsets.UTF_8)
extends HttpClientLike
with TimingSupport
with Logging
with Closeable {
import InstrumentedHttpClient._

Expand Down Expand Up @@ -71,18 +66,7 @@ class InstrumentedHttpClient(
* @param request HttpUriRequest
* @return HttpResponse
*/
def retrieve(request: HttpUriRequest): HttpResponse = {
debug(LTSV.dump("HTTP request" -> request.toString))
time {
httpClient.execute(request, responseHandler)
} {
(httpResponse, duration) =>
debug(LTSV.dump(
"HTTP status" -> httpResponse.status.toString,
"HTTP response time" -> duration.toString
))
}
}
def retrieve(request: HttpUriRequest): HttpResponse = httpClient.execute(request, responseHandler)

/**
* A [[PoolingHttpClientConnectionManager]] which monitors the number of open connections.
Expand Down
8 changes: 4 additions & 4 deletions app/com/m3/octoparts/repository/ConfigImporter.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.m3.octoparts.repository

import com.beachape.logging.LTSVLogger
import com.m3.octoparts.model.config.json
import com.m3.octoparts.model.config._
import com.m3.octoparts.repository.config._
import scalikejdbc._
import skinny.util.LTSV

import scala.concurrent.Future

Expand Down Expand Up @@ -38,7 +38,7 @@ trait ConfigImporter {
saveWithSession(ThreadPoolConfigRepository, ThreadPoolConfig.fromJsonModel(threadPoolConfig)).map(threadPoolConfig.threadPoolKey -> _)
} {
tpcWasThere =>
debug(LTSV.dump("Thread pool" -> tpcWasThere.threadPoolKey, "Action" -> "Skipping import"))
LTSVLogger.debug("Thread pool" -> tpcWasThere.threadPoolKey, "Action" -> "Skipping import")
Future.successful(threadPoolConfig.threadPoolKey -> tpcWasThere.id.get)
}
}
Expand All @@ -63,7 +63,7 @@ trait ConfigImporter {
getWithSession(CacheGroupRepository, sqls.eq(CacheGroupRepository.defaultAlias.id, cacheGroupId)).map(jCacheGroup.name -> _.get)
}
} { cgWasThere =>
debug(LTSV.dump("Cache group" -> cgWasThere.name, "Action" -> "Skipping import"))
LTSVLogger.debug("Cache group" -> cgWasThere.name, "Action" -> "Skipping import")
Future.successful(cgWasThere.name -> cgWasThere)
}
}
Expand Down Expand Up @@ -116,7 +116,7 @@ trait ConfigImporter {
* @return the [[HttpPartConfig.partId]] if inserted, else [[None]]
*/
private[repository] def insertConfigIfMissing(jpart: json.HttpPartConfig)(implicit session: DBSession): Future[Option[String]] = {
info(LTSV.dump("part" -> jpart.toString, "action" -> "insert if missing"))
LTSVLogger.info("part" -> jpart.toString, "action" -> "insert if missing")

val oldConfig = getWithSession(HttpPartConfigRepository, sqls.eq(HttpPartConfigRepository.defaultAlias.partId, jpart.partId))
oldConfig.flatMap { mbConfigWasThere =>
Expand Down
19 changes: 9 additions & 10 deletions app/com/m3/octoparts/repository/DBConfigsRepository.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.m3.octoparts.repository

import com.beachape.logging.LTSVLogger
import com.m3.octoparts.model.config._
import com.m3.octoparts.repository.config._
import play.api.Play
import play.api.libs.concurrent.Akka
import scalikejdbc._
import skinny.logging.Logging
import skinny.orm.SkinnyCRUDMapper
import skinny.orm.feature.associations.Association
import skinny.util.LTSV
import com.m3.octoparts.future.RichFutureWithTiming._

import scala.concurrent.{ Future, blocking }
Expand Down Expand Up @@ -45,7 +44,7 @@ object DBContext {

}

trait ImmutableDBRepository extends ConfigsRepository with Logging {
trait ImmutableDBRepository extends ConfigsRepository {
import DBContext._

// Configs
Expand Down Expand Up @@ -81,7 +80,7 @@ trait ImmutableDBRepository extends ConfigsRepository with Logging {
blocking {
val ret = mapper.joins(joins: _*).includes(includes: _*).findBy(where)
ret.foreach {
ci => debug(LTSV.dump("Table" -> mapper.tableName, "where" -> where.toString(), "Data" -> ret.toString))
ci => LTSVLogger.debug("Table" -> mapper.tableName, "where" -> where.toString(), "Data" -> ret.toString)
}
ret
}
Expand All @@ -95,7 +94,7 @@ trait ImmutableDBRepository extends ConfigsRepository with Logging {
includes: Seq[Association[A]] = Nil)(implicit session: DBSession = ReadOnlyAutoSession): Future[Seq[A]] = Future {
blocking {
val ret = mapper.joins(joins: _*).includes(includes: _*).findAll()
debug(LTSV.dump("Table" -> mapper.tableName, "Retrieved records" -> ret.length.toString))
LTSVLogger.debug("Table" -> mapper.tableName, "Retrieved records" -> ret.length.toString)
ret
}
}.measure("DB_GET")
Expand All @@ -109,13 +108,13 @@ trait ImmutableDBRepository extends ConfigsRepository with Logging {
includes: Seq[Association[A]] = Nil)(implicit session: DBSession = ReadOnlyAutoSession): Future[Seq[A]] = Future {
blocking {
val ret = mapper.joins(joins: _*).includes(includes: _*).findAllBy(where)
debug(LTSV.dump("Table" -> mapper.tableName, "Retrieved records" -> ret.length.toString))
LTSVLogger.debug("Table" -> mapper.tableName, "Retrieved records" -> ret.length.toString)
ret
}
}.measure("DB_GET")
}

trait MutableDBRepository extends MutableConfigsRepository with Logging {
trait MutableDBRepository extends MutableConfigsRepository {
import DBContext._

def save[A <: ConfigModel[A]: ConfigMapper](obj: A): Future[Long] = DB.futureLocalTx { implicit session => saveWithSession(implicitly[ConfigMapper[A]], obj) }
Expand All @@ -136,7 +135,7 @@ trait MutableDBRepository extends MutableConfigsRepository with Logging {
private[repository] def saveWithSession[A <: ConfigModel[A]](mapper: ConfigMapper[A], model: A)(implicit session: DBSession = AutoSession): Future[Long] = Future {
blocking {
val id = mapper.save(model)
info(LTSV.dump("Table" -> mapper.tableName, "Data" -> model.toString, "Action" -> "Saved"))
LTSVLogger.info("Table" -> mapper.tableName, "Data" -> model.toString, "Action" -> "Saved")
id
}
}.measure("DB_UPDATE")
Expand All @@ -148,7 +147,7 @@ trait MutableDBRepository extends MutableConfigsRepository with Logging {
private[repository] def deleteWithSession(mapper: SkinnyCRUDMapper[_], where: SQLSyntax)(implicit session: DBSession = AutoSession): Future[Int] = Future {
blocking {
val count = mapper.deleteBy(where)
info(LTSV.dump("Table" -> mapper.tableName, "where" -> where.toString(), "count" -> count.toString, "Action" -> "Deleted"))
LTSVLogger.info("Table" -> mapper.tableName, "where" -> where.toString(), "count" -> count.toString, "Action" -> "Deleted")
count
}
}.measure("DB_UPDATE")
Expand All @@ -159,7 +158,7 @@ trait MutableDBRepository extends MutableConfigsRepository with Logging {
private[repository] def deleteAllWithSession(mapper: SkinnyCRUDMapper[_])(implicit session: DBSession = AutoSession): Future[Int] = Future {
blocking {
val count = mapper.deleteAll
warn(LTSV.dump("Table" -> mapper.tableName, "count" -> count.toString, "Action" -> "Truncated"))
LTSVLogger.warn("Table" -> mapper.tableName, "count" -> count.toString, "Action" -> "Truncated")
count
}
}.measure("DB_UPDATE")
Expand Down
Loading

0 comments on commit a3e1272

Please sign in to comment.