Skip to content

Commit

Permalink
Merge branch 'release/2.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Birchall committed Sep 26, 2014
2 parents 4e79eda + 31514f4 commit 0fbfc3d
Show file tree
Hide file tree
Showing 131 changed files with 2,588 additions and 879 deletions.
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ jdk:

scala:
- 2.11.2
- 2.10.4

addons:
postgresql: "9.3"

before_script:
- psql -c "CREATE DATABASE octoparts_test WITH ENCODING 'UTF8';" -U postgres
- psql -c "CREATE USER octoparts_app WITH PASSWORD '';" -U postgres
- psql -c "GRANT ALL PRIVILEGES ON DATABASE octoparts_test to octoparts_app;" -U postgres
- psql -c "GRANT ALL PRIVILEGES ON DATABASE octoparts_test to octoparts_app;" -U postgres

script: "sbt coveralls"
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Octoparts
# Octoparts [![Build Status](https://travis-ci.org/m3dev/octoparts.svg?branch=develop)](https://travis-ci.org/m3dev/octoparts) [![Coverage Status](https://coveralls.io/repos/m3dev/octoparts/badge.png?branch=develop)](https://coveralls.io/r/m3dev/octoparts?branch=develop)

[See documentation](http://m3dev.github.io/octoparts/)

Also see these [Lightning talk slides](https://docs.google.com/presentation/d/1dgbLSaEyWydGX6SaPtGeKXX-6p-0OuUvnWFpiOqgoQo/edit?usp=sharing) from ScalaMatsuri 2014 for a quick explanation of what Octoparts is all about.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.net.{ URI, URLEncoder }

import com.m3.octoparts.http.{ HttpResponse, _ }
import com.m3.octoparts.hystrix._
import com.m3.octoparts.model.PartResponse
import com.m3.octoparts.model.{ HttpMethod, PartResponse }
import com.m3.octoparts.model.config._
import com.netaporter.uri.Uri
import com.netaporter.uri.dsl._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.m3.octoparts.aggregator.handler

import com.m3.octoparts.http.HttpMethod.Get
import com.m3.octoparts.http._
import com.m3.octoparts.hystrix._
import com.m3.octoparts.model.HttpMethod
import com.m3.octoparts.model.config._

import scala.concurrent.ExecutionContext
Expand All @@ -19,7 +19,7 @@ class SimpleHttpPartRequestHandler(
val partId: String,
val httpClient: HttpClientLike,
val uriToInterpolate: String,
val httpMethod: HttpMethod.Value = Get,
val httpMethod: HttpMethod.Value = HttpMethod.Get,
val additionalValidStatuses: Set[Int] = Set.empty,
val registeredParams: Set[PartParam] = Set.empty,
val hystrixExecutor: HystrixExecutor)(implicit val executionContext: ExecutionContext)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.m3.octoparts.aggregator.service

import com.m3.octoparts.aggregator.handler.HttpHandlerFactory
import com.m3.octoparts.cache.PartResponseCachingSupport
import com.m3.octoparts.cache.client.CacheClient
import com.m3.octoparts.cache.{ CacheOps, PartResponseCachingSupport }
import com.m3.octoparts.logging.PartRequestLogger
import com.m3.octoparts.repository.ConfigsRepository
import scaldi.Module
Expand All @@ -17,7 +16,7 @@ class AggregatorServicesModule extends Module {
inject[ConfigsRepository],
inject[HttpHandlerFactory]
) with PartResponseCachingSupport {
val cacheClient = inject[CacheClient]
val cacheOps = inject[CacheOps]
}

bind[PartsService] to new PartsService(
Expand Down
4 changes: 2 additions & 2 deletions app/com/m3/octoparts/aggregator/service/PartsService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class PartsService(partRequestService: PartRequestServiceBase,
*/
def processParts(aggregateRequest: AggregateRequest, noCache: Boolean = false): Future[AggregateResponse] = {
val requestMeta = aggregateRequest.requestMeta
val aReqTimeout = requestMeta.timeoutMs.fold(maximumAggReqTimeout)(_.toInt millis) min maximumAggReqTimeout
val aReqTimeout = requestMeta.timeout.getOrElse(maximumAggReqTimeout) min maximumAggReqTimeout
val partsResponsesFutures = aggregateRequest.requests.map {
pReq =>
val partRequestInfo = PartRequestInfo(requestMeta, pReq, noCache)
Expand All @@ -57,7 +57,7 @@ class PartsService(partRequestService: PartRequestServiceBase,
}
Future.sequence(partsResponsesFutures).timeAndTransform {
(partsResponses, duration) =>
val responseMeta = ResponseMeta(requestMeta.id, duration.toMillis)
val responseMeta = ResponseMeta(requestMeta.id, duration)
val aggregateResponse = AggregateResponse(responseMeta, partsResponses)

if (Logger.isDebugEnabled) {
Expand Down
22 changes: 22 additions & 0 deletions app/com/m3/octoparts/cache/Cache.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.m3.octoparts.cache

import com.m3.octoparts.cache.key._
import shade.memcached.Codec

import scala.concurrent.Future
import scala.concurrent.duration._

/**
* A cache whose values are keyed using [[com.m3.octoparts.cache.key.CacheKey]]s.
*
* Usually wraps an instance of a [[RawCache]].
*/
trait Cache {

// TODO shade dependency decoupling

def get[T](key: CacheKey)(implicit codec: Codec[T]): Future[Option[T]]

def put[T](key: CacheKey, v: T, ttl: Option[Duration])(implicit codec: Codec[T]): Future[Unit]

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.m3.octoparts.cache.client
package com.m3.octoparts.cache

import com.m3.octoparts.cache.key.CacheKey

Expand Down
24 changes: 12 additions & 12 deletions app/com/m3/octoparts/cache/CacheModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package com.m3.octoparts.cache
import java.util.concurrent.{ TimeUnit, _ }

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.m3.octoparts.cache.client._
import com.m3.octoparts.cache.dummy.{ NoCacheAccessor, NoCacheAdapter, NoCacheClient, NoLatestVersionCache }
import com.m3.octoparts.cache.memcached._
import com.m3.octoparts.cache.dummy.{ DummyCache, DummyRawCache, DummyCacheOps, DummyLatestVersionCache }
import com.m3.octoparts.cache.key.MemcachedKeyGenerator
import com.m3.octoparts.cache.versioning.{ InMemoryLatestVersionCache, LatestVersionCache }
import play.api.Configuration
Expand All @@ -28,7 +28,7 @@ class CacheModule extends Module {
new ThreadPoolExecutor(0, poolSize, 1L, TimeUnit.MINUTES, queue, namedThreadFactory))
}

private def buildMemcached(): Memcached = {
private def buildMemcachedRawCache(): RawCache = {
val playConfig = inject[Configuration]
val tsConfig = playConfig.underlying

Expand All @@ -48,27 +48,27 @@ class CacheModule extends Module {
authentication = auth
), cacheExecutor)

new LoggingMemcachedWrapper(shade)(cacheExecutor)
new LoggingRawCache(new MemcachedRawCache(shade))(cacheExecutor)
}

when(cachingEnabled) {
bind[LatestVersionCache] to new InMemoryLatestVersionCache
when(useInMemoryCache) {
bind[Memcached] to new InMemoryCacheAdapter()(cacheExecutor) destroyWith (_.close())
bind[RawCache] to new InMemoryRawCache()(cacheExecutor) destroyWith (_.close())
}
when(useMemcached) {
bind[Memcached] to buildMemcached() destroyWith (_.close())
bind[RawCache] to buildMemcachedRawCache() destroyWith (_.close())
}
bind[MemcachedKeyGenerator] to MemcachedKeyGenerator
bind[CacheAccessor] to injected[MemcachedAccessor]
bind[CacheClient] to injected[MemcachedClient]
bind[Cache] to injected[MemcachedCache]
bind[CacheOps] to injected[MemcachedCacheOps]
}

when(cachingDisabled) {
bind[LatestVersionCache] to NoLatestVersionCache
bind[Memcached] to NoCacheAdapter
bind[CacheAccessor] to NoCacheAccessor
bind[CacheClient] to NoCacheClient
bind[LatestVersionCache] to DummyLatestVersionCache
bind[RawCache] to DummyRawCache
bind[Cache] to DummyCache
bind[CacheOps] to DummyCacheOps
}

def cachingDisabled = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.m3.octoparts.cache.client
package com.m3.octoparts.cache

import com.m3.octoparts.cache.directive.CacheDirective
import com.m3.octoparts.cache.versioning._
Expand All @@ -7,15 +7,24 @@ import com.m3.octoparts.model.PartResponse
import scala.concurrent.Future

/**
* The client that performs actual caching operations
* Operations related to caching of [[com.m3.octoparts.model.PartResponse]]s.
*/
trait CacheClient {
trait CacheOps {

/**
* Increase the cache version of the given part ID,
* effectively invalidating all cached PartResponses for this part.
*/
def increasePartVersion(partId: String): Future[Unit]

/**
* Increase the cache version of the given (partId, parameter name, parameter value) combination,
* effectively invalidating all cached PartResponses for this combination.
*/
def increaseParamVersion(vpk: VersionedParamKey): Future[Unit]

/**
* Lookup the item in the cache. If it is found, return it,
* Lookup the PartResponse in the cache. If it is found, return it,
* otherwise run the provided block, store the result in the cache and return it.
*
* @param directive cache directive
Expand All @@ -25,11 +34,7 @@ trait CacheClient {
def putIfAbsent(directive: CacheDirective)(f: => Future[PartResponse]): Future[PartResponse]

/**
* Inconditional put
*
* @param directive
* @param partResponse
* @return
* Unconditional PUT of a PartResponse
*/
def saveLater(partResponse: PartResponse, directive: CacheDirective): Future[Unit]
}
Expand Down
34 changes: 34 additions & 0 deletions app/com/m3/octoparts/cache/LoggingRawCache.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.m3.octoparts.cache

import com.m3.octoparts.logging.LogUtil
import play.api.Logger
import shade.memcached.Codec
import skinny.util.LTSV

import scala.concurrent.duration.Duration
import scala.concurrent.{ ExecutionContext, Future }

/**
* A decorator to add debug logging to a [[RawCache]]
*/
class LoggingRawCache(delegate: RawCache)(implicit executionContext: ExecutionContext) extends RawCache with LogUtil {

def get[T](key: String)(implicit codec: Codec[T]): Future[Option[T]] = {
val f = delegate.get(key)(codec)
f.onSuccess {
case mbVal => Logger.debug(LTSV.dump("Memcached" -> "get", "key" -> key, "is" -> truncateValue(mbVal)))
}
f
}

def set[T](key: String, value: T, exp: Duration)(implicit codec: Codec[T]): Future[Unit] = {
val f = delegate.set(key, value, exp)(codec)
f.onSuccess {
case done => Logger.debug(LTSV.dump("Memcached" -> "set", "key" -> key, "value" -> truncateValue(value), "duration" -> exp.toString))
}
f
}

def close(): Unit = delegate.close()

}
53 changes: 31 additions & 22 deletions app/com/m3/octoparts/cache/PartResponseCachingSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,48 @@ package com.m3.octoparts.cache

import com.m3.octoparts.aggregator.PartRequestInfo
import com.m3.octoparts.aggregator.service.PartRequestServiceBase
import com.m3.octoparts.cache.client.{ CacheClient, CacheException }
import com.m3.octoparts.cache.directive.{ CacheDirective, CacheDirectiveGenerator }
import com.m3.octoparts.model.PartResponse
import com.m3.octoparts.model.config._
import org.apache.http.HttpStatus
import skinny.logging.Logging
import skinny.util.LTSV
import com.m3.octoparts.cache.RichCacheControl._

import scala.concurrent.Future

trait PartResponseCachingSupport extends PartRequestServiceBase with Logging {
private[cache] object PartResponseCachingSupport {

/**
* @return The response to use (between the cached one and the new one). Will use new one <=> the new one is not a 304.
*/
private[cache] def selectLatest(newPartResponse: PartResponse, existingPartResponse: PartResponse): PartResponse = {
val is304 = newPartResponse.statusCode.fold(false) {
_.intValue == HttpStatus.SC_NOT_MODIFIED
}
if (is304) existingPartResponse else newPartResponse
}

private[cache] def shouldRevalidate(partResponse: PartResponse): Boolean = {
partResponse.retrievedFromCache && partResponse.cacheControl.shouldRevalidate
}
}

import com.m3.octoparts.cache.RichCacheControl._
trait PartResponseCachingSupport extends PartRequestServiceBase with Logging {
import PartResponseCachingSupport._

def cacheClient: CacheClient
def cacheOps: CacheOps

override def processWithConfig(
ci: HttpPartConfig, partRequestInfo: PartRequestInfo, params: Map[ShortPartParam, String]): Future[PartResponse] = {
override def processWithConfig(ci: HttpPartConfig,
partRequestInfo: PartRequestInfo,
params: Map[ShortPartParam, String]): Future[PartResponse] = {

if (partRequestInfo.noCache || !ci.cacheConfig.cachingEnabled) {
// noCache or TTL defined but 0 => skip caching
super.processWithConfig(ci, partRequestInfo, params)
} else {
val directive = CacheDirectiveGenerator.generateDirective(ci.partId, params, ci.cacheConfig)
val futureMaybeFromCache = cacheClient.putIfAbsent(directive)(super.processWithConfig(ci, partRequestInfo, params)).recoverWith {
val futureMaybeFromCache = cacheOps.putIfAbsent(directive)(super.processWithConfig(ci, partRequestInfo, params)).recoverWith {
case ce: CacheException =>
ce.getCause match {
case te: shade.TimeoutException =>
Expand All @@ -39,7 +56,7 @@ trait PartResponseCachingSupport extends PartRequestServiceBase with Logging {
futureMaybeFromCache.flatMap {
partResponse =>
// at this point, the response may come from cache and be stale.
if (partResponse.retrievedFromCache && partResponse.cacheControl.shouldRevalidate) {
if (shouldRevalidate(partResponse)) {
revalidate(partResponse, directive, ci, partRequestInfo, params)
} else {
Future.successful(partResponse)
Expand All @@ -51,8 +68,11 @@ trait PartResponseCachingSupport extends PartRequestServiceBase with Logging {
}
}

private[cache] def revalidate(
partResponse: PartResponse, directive: CacheDirective, ci: HttpPartConfig, partRequestInfo: PartRequestInfo, params: Map[ShortPartParam, String]): Future[PartResponse] = {
private[cache] def revalidate(partResponse: PartResponse,
directive: CacheDirective,
ci: HttpPartConfig,
partRequestInfo: PartRequestInfo,
params: Map[ShortPartParam, String]): Future[PartResponse] = {

val revalidationParams = partResponse.cacheControl.revalidationHeaders.map {
case (k, v) => ShortPartParam(outputName = k, paramType = ParamType.Header) -> v
Expand All @@ -63,19 +83,8 @@ trait PartResponseCachingSupport extends PartRequestServiceBase with Logging {
selectLatest(revalidatedPartResponse, partResponse)
}
revalidatedFResp.onSuccess {
case latestResponse => cacheClient.saveLater(latestResponse, directive)
case latestResponse => cacheOps.saveLater(latestResponse, directive)
}
revalidatedFResp
}

/**
* @return The response to use (between the cached one and the new one). Will use new one <=> the new one is not a 304.
*/
private[cache] def selectLatest(newPartResponse: PartResponse, existingPartResponse: PartResponse): PartResponse = {
val is304 = newPartResponse.statusCode.fold(false) {
_.intValue == HttpStatus.SC_NOT_MODIFIED
}
if (is304) existingPartResponse else newPartResponse
}

}
38 changes: 38 additions & 0 deletions app/com/m3/octoparts/cache/RawCache.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.m3.octoparts.cache

import shade.memcached.Codec

import scala.concurrent.Future
import scala.concurrent.duration.Duration

/**
* A facade for [[shade.memcached.Memcached]].
*
* This is called a "raw" cache because it uses raw Strings for keys.
* Usually it will be wrapped by a [[Cache]] implementation for easier access.
*
* Note: We still have a dependency on [[shade.memcached.Codec]], which is not ideal,
* but we can remove it if/when we decide to migrate away from Shade.
*/
trait RawCache {

/**
* Fetches a value from the cache store.
*
* @return Some(value) in case the key is available, or None otherwise (doesn't throw exception on key missing)
*/
def get[T](key: String)(implicit codec: Codec[T]): Future[Option[T]]

/**
* Sets a (key, value) in the cache store.
*
* The TTL can be Duration.Inf (infinite duration).
*/
def set[T](key: String, value: T, ttl: Duration)(implicit codec: Codec[T]): Future[Unit]

/**
* Shutdown and clean up any resources.
*/
def close(): Unit

}
Loading

0 comments on commit 0fbfc3d

Please sign in to comment.