diff --git a/app/com/m3/octoparts/hystrix/HystrixSetterSupport.scala b/app/com/m3/octoparts/hystrix/HystrixSetterSupport.scala index b99d5929..4a8fa106 100644 --- a/app/com/m3/octoparts/hystrix/HystrixSetterSupport.scala +++ b/app/com/m3/octoparts/hystrix/HystrixSetterSupport.scala @@ -1,6 +1,7 @@ package com.m3.octoparts.hystrix -import com.m3.octoparts.model.config.HystrixConfig +import com.m3.octoparts.model.config.{ ThreadPoolConfig, HystrixConfig } +import com.netflix.hystrix.HystrixThreadPoolProperties.Setter import com.netflix.hystrix._ /** @@ -13,16 +14,23 @@ trait HystrixSetterSupport { * Given a HystrixArgument case class returns a HystrixCommand.Setter object */ def setter(config: HystrixConfig): HystrixCommand.Setter = { - val threadPoolProperties = HystrixThreadPoolProperties.Setter(). - withCoreSize(config.threadPoolConfigItem.coreSize). - withMaxQueueSize(config.threadPoolConfigItem.queueSize) + val threadPoolConfig: ThreadPoolConfig = config.threadPoolConfigItem + val threadPoolProperties = threadPoolSetter(threadPoolConfig) HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(config.commandGroupKey)). andCommandKey(HystrixCommandKey.Factory.asKey(config.commandKey)). - andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(config.threadPoolConfigItem.threadPoolKey)). + andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolConfig.threadPoolKey)). andThreadPoolPropertiesDefaults(threadPoolProperties). andCommandPropertiesDefaults(HystrixCommandProperties.Setter(). withExecutionTimeoutInMilliseconds(config.timeout.toMillis.toInt). withFallbackEnabled(config.localContentsAsFallback)) } + + private[hystrix] def threadPoolSetter(threadPoolConfig: ThreadPoolConfig): Setter = { + HystrixThreadPoolProperties.Setter(). + withCoreSize(threadPoolConfig.coreSize). + // Hystrix uses both of these for setting Queue size + withMaxQueueSize(threadPoolConfig.queueSize). + withQueueSizeRejectionThreshold(threadPoolConfig.queueSize) + } } diff --git a/app/com/m3/octoparts/model/config/ThreadPoolConfig.scala b/app/com/m3/octoparts/model/config/ThreadPoolConfig.scala index b42514c1..85da4a97 100644 --- a/app/com/m3/octoparts/model/config/ThreadPoolConfig.scala +++ b/app/com/m3/octoparts/model/config/ThreadPoolConfig.scala @@ -13,12 +13,10 @@ case class ThreadPoolConfig( threadPoolKey: String, coreSize: Int = ThreadPoolConfig.defaultCoreSize, hystrixConfigs: Set[HystrixConfig] = Set.empty, + queueSize: Int = ThreadPoolConfig.defaultQueueSize, createdAt: DateTime, updatedAt: DateTime) extends ConfigModel[ThreadPoolConfig] { - // this setting is not yet available for users - def queueSize: Int = ThreadPoolConfig.defaultQueueSize - /** * @return a sorted list of related [[HttpPartConfig]] */ @@ -44,6 +42,7 @@ object ThreadPoolConfig { def fromJsonModel(config: JsonThreadPoolConfig): ThreadPoolConfig = { ThreadPoolConfig( threadPoolKey = config.threadPoolKey, + queueSize = config.queueSize, coreSize = config.coreSize, createdAt = DateTime.now, updatedAt = DateTime.now diff --git a/app/com/m3/octoparts/repository/config/ThreadPoolConfigRepository.scala b/app/com/m3/octoparts/repository/config/ThreadPoolConfigRepository.scala index ceec9383..c84f9f56 100644 --- a/app/com/m3/octoparts/repository/config/ThreadPoolConfigRepository.scala +++ b/app/com/m3/octoparts/repository/config/ThreadPoolConfigRepository.scala @@ -15,7 +15,8 @@ object ThreadPoolConfigRepository extends ConfigMapper[ThreadPoolConfig] with Ti protected val permittedFields = Seq( "threadPoolKey" -> SkinnyParamType.String, - "coreSize" -> SkinnyParamType.Int + "coreSize" -> SkinnyParamType.Int, + "queueSize" -> SkinnyParamType.Int ) lazy val hystrixConfigRef = hasMany[HystrixConfig]( @@ -30,6 +31,7 @@ object ThreadPoolConfigRepository extends ConfigMapper[ThreadPoolConfig] with Ti id = rs.get(n.id), threadPoolKey = rs.get(n.threadPoolKey), coreSize = rs.get(n.coreSize), + queueSize = rs.get(n.queueSize), createdAt = rs.get(n.createdAt), updatedAt = rs.get(n.updatedAt) ) diff --git a/app/controllers/AdminController.scala b/app/controllers/AdminController.scala index b6d34585..cb9f4c53 100644 --- a/app/controllers/AdminController.scala +++ b/app/controllers/AdminController.scala @@ -353,7 +353,7 @@ class AdminController(cacheOps: CacheOps, repository: MutableConfigsRepository)( flashError(routes.AdminController.newThreadPool, Messages("admin.validationErrors", formWithErrors.errors)) } }, { data => - val tpc = ThreadPoolConfig(threadPoolKey = data.threadPoolKey, coreSize = data.coreSize, createdAt = DateTime.now, updatedAt = DateTime.now) + val tpc = ThreadPoolConfig(threadPoolKey = data.threadPoolKey, coreSize = data.coreSize, queueSize = data.queueSize, createdAt = DateTime.now, updatedAt = DateTime.now) saveAndRedirect { repository.save(tpc) }(routes.AdminController.listThreadPools, id => routes.AdminController.showThreadPool(id)) @@ -368,7 +368,7 @@ class AdminController(cacheOps: CacheOps, repository: MutableConfigsRepository)( flashError(routes.AdminController.editThreadPool(id), Messages("admin.validationErrors", formWithErrors.errors)) } }, { data => - val updatedTpc = tpc.copy(threadPoolKey = data.threadPoolKey, coreSize = data.coreSize, updatedAt = DateTime.now) + val updatedTpc = tpc.copy(threadPoolKey = data.threadPoolKey, coreSize = data.coreSize, queueSize = data.queueSize, updatedAt = DateTime.now) saveAndRedirect { repository.save(updatedTpc) }(routes.AdminController.editThreadPool(id), _ => routes.AdminController.showThreadPool(id)) diff --git a/app/controllers/AdminForms.scala b/app/controllers/AdminForms.scala index 96df1f38..d74b6279 100644 --- a/app/controllers/AdminForms.scala +++ b/app/controllers/AdminForms.scala @@ -229,12 +229,13 @@ object AdminForms { )(ParamData.apply)(ParamData.unapply) ) - case class ThreadPoolData(threadPoolKey: String, coreSize: Int) + case class ThreadPoolData(threadPoolKey: String, coreSize: Int, queueSize: Int) val threadPoolForm = Form( mapping( "threadPoolKey" -> text, - "coreSize" -> number + "coreSize" -> number(min = 1), + "queueSize" -> number(min = -1) // Hystrix says that if -1 is used, it will use a synchronous queue )(ThreadPoolData.apply)(ThreadPoolData.unapply) ) diff --git a/app/views/threadpool/edit.scala.html b/app/views/threadpool/edit.scala.html index cf95909b..58282746 100644 --- a/app/views/threadpool/edit.scala.html +++ b/app/views/threadpool/edit.scala.html @@ -1,4 +1,5 @@ @(maybeTpc: Option[com.m3.octoparts.model.config.ThreadPoolConfig])(implicit flash: Flash, navbarLinks: presentation.NavbarLinks, lang: Lang) +@import com.m3.octoparts.model.config.ThreadPoolConfig @title = @{ maybeTpc match { @@ -44,7 +45,18 @@

@title

- + +
+
+ + + +
+ +
+
+ +

@Messages("threadPools.queueSizeExplanation")

diff --git a/app/views/threadpool/list.scala.html b/app/views/threadpool/list.scala.html index bd69e571..8fac0862 100644 --- a/app/views/threadpool/list.scala.html +++ b/app/views/threadpool/list.scala.html @@ -22,6 +22,7 @@ @Messages("threadPools.key") @Messages("threadPools.coreSize") + @Messages("threadPools.queueSize") @Messages("threadPools.members") @Messages("action") @@ -31,6 +32,7 @@ @tpc.threadPoolKey @tpc.coreSize + @tpc.queueSize @tpc.hystrixConfigs.size @Messages("edit") diff --git a/conf/db/migration/default/V6__Add_threadpool_queue.sql b/conf/db/migration/default/V6__Add_threadpool_queue.sql new file mode 100644 index 00000000..3fcc551a --- /dev/null +++ b/conf/db/migration/default/V6__Add_threadpool_queue.sql @@ -0,0 +1,7 @@ +ALTER TABLE thread_pool_config +ADD COLUMN queue_size INTEGER + NOT NULL + DEFAULT 256 + CHECK (queue_size >= -1); + +COMMENT ON COLUMN thread_pool_config.queue_size IS 'size of the execution queue for a specific thread pool'; \ No newline at end of file diff --git a/conf/messages.en b/conf/messages.en index 9471b94e..9223afe8 100644 --- a/conf/messages.en +++ b/conf/messages.en @@ -114,6 +114,7 @@ threadPools.edit=Edit settings for thread pool {0} threadPools.key=Thread pool key threadPools.coreSize=Core size threadPools.queueSize=Queue size +threadPools.queueSizeExplanation=-1 → no queue threadPools.members=Members cacheGroups.this=Cache group diff --git a/conf/messages.ja b/conf/messages.ja index 969b9409..103f7040 100644 --- a/conf/messages.ja +++ b/conf/messages.ja @@ -113,6 +113,7 @@ threadPools.edit=「{0}」スレッドプール設定編集 threadPools.key=スレッドプールキー threadPools.coreSize=コア数 threadPools.queueSize=キュー数 +threadPools.queueSizeExplanation=-1 → キューなし threadPools.members=メンバーパーツ cacheGroups.this=キャッシュグループ diff --git a/project/Version.scala b/project/Version.scala index 6d1e91c7..67875d5d 100644 --- a/project/Version.scala +++ b/project/Version.scala @@ -1,7 +1,7 @@ object Version { - val octopartsVersion = "2.5" + val octopartsVersion = "2.5.1" val theScalaVersion = "2.11.7" } diff --git a/test/com/m3/octoparts/hystrix/HystrixSetterSupportSpec.scala b/test/com/m3/octoparts/hystrix/HystrixSetterSupportSpec.scala new file mode 100644 index 00000000..03ca4639 --- /dev/null +++ b/test/com/m3/octoparts/hystrix/HystrixSetterSupportSpec.scala @@ -0,0 +1,20 @@ +package com.m3.octoparts.hystrix + +import com.m3.octoparts.support.mocks.ConfigDataMocks +import org.scalatest._ + +class HystrixSetterSupportSpec extends FunSpec with Matchers with ConfigDataMocks { + + val subject = new HystrixSetterSupport {} + + describe("#threadPoolSetter") { + + it("should return a Hystrix ThreadPool setter with queue size and queue threshold size set to the config item's queue size") { + val r = subject.threadPoolSetter(mockThreadConfig) + r.getMaxQueueSize shouldBe mockThreadConfig.queueSize + r.getQueueSizeRejectionThreshold shouldBe mockThreadConfig.queueSize + } + + } + +} diff --git a/test/com/m3/octoparts/model/config/JsonConversionSpec.scala b/test/com/m3/octoparts/model/config/JsonConversionSpec.scala index b3ba1656..046c4f5b 100644 --- a/test/com/m3/octoparts/model/config/JsonConversionSpec.scala +++ b/test/com/m3/octoparts/model/config/JsonConversionSpec.scala @@ -149,9 +149,8 @@ class JsonConversionSpec extends FunSpec with Matchers with Checkers with Genera val tpc = ThreadPoolConfig.fromJsonModel(jtpc) tpc.threadPoolKey should be(jtpc.threadPoolKey) tpc.coreSize should be(jtpc.coreSize) - // This queueSize setting is not open yet - tpc.queueSize should be(ThreadPoolConfig.defaultQueueSize) - ThreadPoolConfig.toJsonModel(tpc) should be(jtpc.copy(queueSize = ThreadPoolConfig.defaultQueueSize)) + tpc.queueSize should be(jtpc.queueSize) + ThreadPoolConfig.toJsonModel(tpc) shouldBe jtpc } } @@ -160,11 +159,9 @@ class JsonConversionSpec extends FunSpec with Matchers with Checkers with Genera jhc: json.HystrixConfig => val hc = HystrixConfig.fromJsonModel(jhc) hc.timeout should be(jhc.timeout) - val ejtpc = jhc.threadPoolConfig.copy(queueSize = ThreadPoolConfig.defaultQueueSize) - hc.threadPoolConfig.map(ThreadPoolConfig.toJsonModel) should be(Some(ejtpc)) hc.commandKey should be(jhc.commandKey) hc.commandGroupKey should be(jhc.commandGroupKey) - HystrixConfig.toJsonModel(hc) should be(jhc.copy(threadPoolConfig = ejtpc)) + HystrixConfig.toJsonModel(hc) shouldBe jhc } } @@ -202,7 +199,7 @@ class JsonConversionSpec extends FunSpec with Matchers with Checkers with Genera hpc.uriToInterpolate should be(jhpc.uriToInterpolate) hpc.description should be(jhpc.description) hpc.method should be(jhpc.method) - val ejhc = jhpc.hystrixConfig.copy(threadPoolConfig = jhpc.hystrixConfig.threadPoolConfig.copy(queueSize = ThreadPoolConfig.defaultQueueSize)) + val ejhc = jhpc.hystrixConfig hpc.hystrixConfig.map(HystrixConfig.toJsonModel) should be(Some(ejhc)) hpc.additionalValidStatuses should equal(jhpc.additionalValidStatuses.to[SortedSet]) hpc.parameters.map(PartParam.toJsonModel) should be(jhpc.parameters.to[SortedSet]) @@ -214,7 +211,7 @@ class JsonConversionSpec extends FunSpec with Matchers with Checkers with Genera hpc.alertPercentThreshold should be(jhpc.alertMailSettings.alertPercentThreshold) hpc.alertInterval should be(jhpc.alertMailSettings.alertInterval) hpc.alertMailRecipients should be(jhpc.alertMailSettings.alertMailRecipients) - HttpPartConfig.toJsonModel(hpc) should be(jhpc.copy(hystrixConfig = ejhc)) + HttpPartConfig.toJsonModel(hpc) shouldBe jhpc } } } diff --git a/test/com/m3/octoparts/repository/config/HttpPartConfigRepositorySpec.scala b/test/com/m3/octoparts/repository/config/HttpPartConfigRepositorySpec.scala index 45dfaaff..497e6f2c 100644 --- a/test/com/m3/octoparts/repository/config/HttpPartConfigRepositorySpec.scala +++ b/test/com/m3/octoparts/repository/config/HttpPartConfigRepositorySpec.scala @@ -1,6 +1,7 @@ package com.m3.octoparts.repository.config import java.nio.charset.StandardCharsets +import java.sql.SQLException import com.m3.octoparts.model.HttpMethod._ import com.m3.octoparts.model.config._ @@ -149,7 +150,7 @@ class HttpPartConfigRepositorySpec extends fixture.FunSpec with DBSuite with Mat describe("hasOne HystrixConfig") { - def setup(implicit session: DBSession): Long = { + def setup(queueSize: Int = 500)(implicit session: DBSession): Long = { val partId = HttpPartConfigRepository.createWithAttributes(httpPartConfigMapWithDeprecation: _*) PartParamRepository.createWithAttributes( 'httpPartConfigId -> partId, @@ -159,7 +160,8 @@ class HttpPartConfigRepositorySpec extends fixture.FunSpec with DBSuite with Mat ) val threadPoolConfigId = ThreadPoolConfigRepository.createWithAttributes( 'threadPoolKey -> "swimmingpool", - 'coreSize -> 10 + 'coreSize -> 10, + 'queueSize -> queueSize ) HystrixConfigRepository.createWithAttributes( 'httpPartConfigId -> partId, @@ -173,7 +175,7 @@ class HttpPartConfigRepositorySpec extends fixture.FunSpec with DBSuite with Mat it("should be possible to retrieve the HystrixConfig of a HttpPartConfig with the .joins method") { implicit session => - val partId = setup(session) + val partId = setup() val config = HttpPartConfigRepository.joins(HttpPartConfigRepository.hystrixConfigRef).findById(partId).get val hystrixConfig = config.hystrixConfig.get hystrixConfig.timeout should be(3.seconds) @@ -183,12 +185,25 @@ class HttpPartConfigRepositorySpec extends fixture.FunSpec with DBSuite with Mat it("should be possible to retrieve the HystrixConfig of a HttpPartConfig, along with it's ThreadPoolConfig, using the .includes method") { implicit session => - val partId = setup(session) + val partId = setup() val config = HttpPartConfigRepository.includes(HttpPartConfigRepository.hystrixConfigRef).findById(partId).get val hystrixConfig = config.hystrixConfig.get val Some(threadPoolConfig) = hystrixConfig.threadPoolConfig // will fail if not eager loaded threadPoolConfig.threadPoolKey should be("swimmingpool") threadPoolConfig.coreSize should be(10) + threadPoolConfig.queueSize should be(500) + } + + it("should allow us to save with queueSize of -1") { + implicit session => + setup(-1) + } + + it("should not allow us to save if queueSize is less than -1") { + implicit session => + intercept[SQLException] { + setup(-2) + } } } @@ -200,7 +215,7 @@ class HttpPartConfigRepositorySpec extends fixture.FunSpec with DBSuite with Mat PartParam(required = false, versioned = false, paramType = ParamType.Header, outputName = "myHeader", createdAt = DateTime.now, updatedAt = DateTime.now) ) - lazy val threadPool = ThreadPoolConfig(threadPoolKey = "myThreadPool", coreSize = 5, createdAt = DateTime.now, updatedAt = DateTime.now) + lazy val threadPool = ThreadPoolConfig(threadPoolKey = "myThreadPool", coreSize = 5, queueSize = 500, createdAt = DateTime.now, updatedAt = DateTime.now) /* Inside a method that takes a session because we need to refer to a valid ThreadPoolConfig id when @@ -252,6 +267,7 @@ class HttpPartConfigRepositorySpec extends fixture.FunSpec with DBSuite with Mat observedThreadPoolConfig.threadPoolKey should be(threadPool.threadPoolKey) observedThreadPoolConfig.coreSize should be(threadPool.coreSize) + observedThreadPoolConfig.queueSize should be(threadPool.queueSize) } describe("to insert") { diff --git a/test/controllers/AdminControllerSpec.scala b/test/controllers/AdminControllerSpec.scala index 0625b0f8..981463bb 100644 --- a/test/controllers/AdminControllerSpec.scala +++ b/test/controllers/AdminControllerSpec.scala @@ -531,7 +531,7 @@ class AdminControllerSpec extends FunSpec val adminController = new AdminController(cacheOps = DummyCacheOps, repository = repository) doReturn(Future.successful(76L)).when(repository).save(anyObject[ThreadPoolConfig]())(anyObject[ConfigMapper[ThreadPoolConfig]], anyObject[Span]) val createThreadPool = adminController.createThreadPool( - FakeRequest().withFormUrlEncodedBody("threadPoolKey" -> "myNewThreadPool", "coreSize" -> "99") + FakeRequest().withFormUrlEncodedBody("threadPoolKey" -> "myNewThreadPool", "coreSize" -> "99", "queueSize" -> "987") ) whenReady(createThreadPool) { result => status(createThreadPool) should be(FOUND) @@ -549,7 +549,7 @@ class AdminControllerSpec extends FunSpec doReturn(Future.successful(Some(tpc))).when(repository).findThreadPoolConfigById(anyLong())(anyObject[Span]) doReturn(Future.successful(123L)).when(repository).save(anyObject[ThreadPoolConfig]())(anyObject[ConfigMapper[ThreadPoolConfig]], anyObject[Span]) - val updateThreadPool = adminController.updateThreadPool(123L)(FakeRequest().withFormUrlEncodedBody("threadPoolKey" -> "myNewThreadPool", "coreSize" -> "99")) + val updateThreadPool = adminController.updateThreadPool(123L)(FakeRequest().withFormUrlEncodedBody("threadPoolKey" -> "myNewThreadPool", "queueSize" -> "512", "coreSize" -> "99")) whenReady(updateThreadPool) { result => verify(repository).findThreadPoolConfigById(123L) diff --git a/test/integration/AdminSpec.scala b/test/integration/AdminSpec.scala new file mode 100644 index 00000000..3227704d --- /dev/null +++ b/test/integration/AdminSpec.scala @@ -0,0 +1,50 @@ +package integration + +import org.openqa.selenium.htmlunit.HtmlUnitDriver +import org.scalatest.{ Matchers, FunSpec } +import org.scalatest.selenium.{ Page => SeleniumPage } +import org.scalatest.concurrent.{ IntegrationPatience, ScalaFutures } +import org.scalatestplus.play.{ HtmlUnitFactory, OneBrowserPerSuite, OneServerPerSuite } + +class AdminSpec + extends FunSpec + with OneServerPerSuite + with OneBrowserPerSuite + with HtmlUnitFactory + with Matchers + with ScalaFutures + with IntegrationPatience { + + val htmlunitDriver = webDriver.asInstanceOf[HtmlUnitDriver] + htmlunitDriver.setJavascriptEnabled(false) + + def baseUrl: String = { + s"http://localhost:$port" + } + + object ThreadPoolAddPage extends SeleniumPage { + val url: String = s"$baseUrl/admin/thread-pools/new" + } + + describe("adding a thread pool") { + + it("should work and redirect me to the show page") { + val name = s"my little pool ${java.util.UUID.randomUUID}" + val size = 10 + val queueSize = 500 + goTo(ThreadPoolAddPage) + textField("threadPoolKey").value = name + numberField("coreSize").value = s"$size" + numberField("queueSize").value = s"$queueSize" + submit() + eventually { + pageTitle should include("Thread pool details") + } + val descriptors = findAll(TagNameQuery("dd")) + descriptors.find(_.text == name) shouldBe 'defined + descriptors.find(_.text == size.toString) shouldBe 'defined + descriptors.find(_.text == queueSize.toString) shouldBe 'defined + } + + } +} diff --git a/vaygrant/provisioning/roles/zipkin/tasks/main.yml b/vaygrant/provisioning/roles/zipkin/tasks/main.yml index dc56aba0..319fc9e2 100644 --- a/vaygrant/provisioning/roles/zipkin/tasks/main.yml +++ b/vaygrant/provisioning/roles/zipkin/tasks/main.yml @@ -54,6 +54,7 @@ - name: Extract Zipkin Source (Zipkin Web only) command: unzip /tmp/zipkin-1.1.0-src.zip "zipkin-1.1.0/zipkin-web/*" -d /opt/zipkin + creates=/opt/zipkin sudo: yes sudo_user: zipkin tags: zipkin @@ -61,6 +62,7 @@ - name: Move Zipkin Web Source folder command: mv /opt/zipkin/zipkin-1.1.0/zipkin-web /opt/zipkin/zipkin-web + creates=/opt/zipkin/zipkin-web sudo: yes sudo_user: zipkin tags: zipkin