Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
Make it possible to deploy an application to different namespaces (#1080
Browse files Browse the repository at this point in the history
)
  • Loading branch information
andreaTP authored Jul 28, 2021
1 parent e9a48e2 commit c7e2391
Show file tree
Hide file tree
Showing 17 changed files with 359 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,19 @@ object OptionsParser {
})
}

private val namespace = {
import scala.language.existentials
opt[String]('n', "namespace")
.text(s"the namespace to be used")
.action((ns, o) => {
val cmd = o.command match {
case Some(v: commands.Command[_]) => Some(v.withNamespace(ns))
case _ => None
}
o.copy(command = cmd)
})
}

private def detailedHelp(command: String)(msg: String) = {
// this is hackish, scopt doesn't support short/long helps and here we are simulating the long option
opt[Unit]("help")
Expand Down Expand Up @@ -107,7 +120,7 @@ object OptionsParser {
cmd("list")
.action((_, o) => o.copy(command = Some(commands.List())))
.text("list available cloudflow applications")
.children(outputFmt)
.children(namespace, outputFmt)
}

private def commandParse[C <: commands.Command[_]: ClassTag, T: Read](parser: OParser[T, Options])(f: (C, T) => C) = {
Expand Down Expand Up @@ -135,6 +148,7 @@ object OptionsParser {
commandParse[commands.Status, String](arg("<cloudflowApp>"))((c, v) => c.copy(cloudflowApp = v))
.required()
.text("the name of the cloudflow application"),
namespace,
outputFmt)
}

Expand All @@ -146,6 +160,7 @@ object OptionsParser {
commandParse[commands.Configuration, String](arg("<cloudflowApp>"))((c, v) => c.copy(cloudflowApp = v))
.required()
.text("the name of the cloudflow application"),
namespace,
outputFmt)
}

Expand Down Expand Up @@ -236,6 +251,7 @@ object OptionsParser {
success
}
}),
namespace,
outputFmt)
}

Expand All @@ -247,6 +263,9 @@ object OptionsParser {
commandParse[commands.UpdateCredentials, String](arg("<cloudflowApp>"))((u, v) => u.copy(cloudflowApp = v))
.required()
.text("the name of the cloudflow application"),
commandParse[commands.UpdateCredentials, String](opt('n', "namespace"))((c, v) => c.copy(namespace = Some(v)))
.optional()
.text("the namespace to be used"),
commandParse[commands.UpdateCredentials, String](arg("<dockerRegistry>"))((u, v) => u.copy(dockerRegistry = v))
.required()
.text("the name of the docker registry"),
Expand All @@ -269,6 +288,7 @@ object OptionsParser {
success
}
}),
namespace,
outputFmt)
}

Expand All @@ -280,6 +300,7 @@ object OptionsParser {
commandParse[commands.Undeploy, String](arg("<cloudflowApp>"))((c, v) => c.copy(cloudflowApp = v))
.required()
.text("the name of the cloudflow application"),
namespace,
outputFmt)
}

Expand All @@ -303,6 +324,7 @@ object OptionsParser {
success
}
}),
namespace,
outputFmt)
}

Expand Down Expand Up @@ -340,6 +362,7 @@ object OptionsParser {
failure("a configuration file doesn't exists")
} else success
}),
namespace,
outputFmt)
}

Expand Down Expand Up @@ -386,15 +409,19 @@ object commands {

sealed trait Command[T] {
val output: format.Format
val namespace: Option[String]

def execution(kubeClient: => KubeClient, logger: CliLogger): Execution[T]

def render(result: T): String

def withOutput(fmt: format.Format): Command[T]

def withNamespace(namespace: String): Command[T]
}

case class Version(output: format.Format = format.Default) extends Command[VersionResult] {
case class Version(namespace: Option[String] = None, output: format.Format = format.Default)
extends Command[VersionResult] {

def execution(kubeClient: => KubeClient, logger: CliLogger): Execution[VersionResult] = {
VersionExecution(this)
Expand All @@ -405,9 +432,12 @@ object commands {
}

def withOutput(fmt: format.Format) = this.copy(output = fmt)

def withNamespace(namespace: String) = this.copy(namespace = Some(namespace))
}

case class List(output: format.Format = format.Default) extends Command[ListResult] {
case class List(namespace: Option[String] = None, output: format.Format = format.Default)
extends Command[ListResult] {

def execution(kubeClient: => KubeClient, logger: CliLogger): Execution[ListResult] = {
ListExecution(this, kubeClient, logger)
Expand All @@ -418,9 +448,12 @@ object commands {
}

def withOutput(fmt: format.Format) = this.copy(output = fmt)

def withNamespace(namespace: String) = this.copy(namespace = Some(namespace))
}

case class Status(cloudflowApp: String = "", output: format.Format = format.Default) extends Command[StatusResult] {
case class Status(cloudflowApp: String = "", namespace: Option[String] = None, output: format.Format = format.Default)
extends Command[StatusResult] {

def execution(kubeClient: => KubeClient, logger: CliLogger): Execution[StatusResult] = {
StatusExecution(this, kubeClient, logger)
Expand All @@ -431,9 +464,14 @@ object commands {
}

def withOutput(fmt: format.Format) = this.copy(output = fmt)

def withNamespace(namespace: String) = this.copy(namespace = Some(namespace))
}

case class Configuration(cloudflowApp: String = "", output: format.Format = format.Default)
case class Configuration(
cloudflowApp: String = "",
namespace: Option[String] = None,
output: format.Format = format.Default)
extends Command[ConfigurationResult] {

def execution(kubeClient: => KubeClient, logger: CliLogger): Execution[ConfigurationResult] = {
Expand All @@ -445,10 +483,13 @@ object commands {
}

def withOutput(fmt: format.Format) = this.copy(output = fmt)

def withNamespace(namespace: String) = this.copy(namespace = Some(namespace))
}

case class Deploy(
crFile: File = new File(""),
namespace: Option[String] = None,
dockerUsername: String = "",
dockerPassword: String = "",
noRegistryCredentials: Boolean = false,
Expand All @@ -474,9 +515,13 @@ object commands {

def withOutput(fmt: format.Format) = this.copy(output = fmt)

def withNamespace(namespace: String) = this.copy(namespace = Some(namespace))
}

case class Undeploy(cloudflowApp: String = "", output: format.Format = format.Default)
case class Undeploy(
cloudflowApp: String = "",
namespace: Option[String] = None,
output: format.Format = format.Default)
extends Command[UndeployResult] {

def execution(kubeClient: => KubeClient, logger: CliLogger): Execution[UndeployResult] = {
Expand All @@ -488,10 +533,13 @@ object commands {
}

def withOutput(fmt: format.Format) = this.copy(output = fmt)

def withNamespace(namespace: String) = this.copy(namespace = Some(namespace))
}

case class Configure(
cloudflowApp: String = "",
namespace: Option[String] = None,
confs: Seq[File] = Seq(),
configKeys: Map[String, String] = Map(),
logbackConfig: Option[File] = None,
Expand All @@ -509,6 +557,8 @@ object commands {
}

def withOutput(fmt: format.Format) = this.copy(output = fmt)

def withNamespace(namespace: String) = this.copy(namespace = Some(namespace))
}

trait WithConfiguration {
Expand Down Expand Up @@ -553,6 +603,7 @@ object commands {

case class UpdateCredentials(
cloudflowApp: String = "",
namespace: Option[String] = None,
dockerRegistry: String = "",
username: String = "",
password: String = "",
Expand All @@ -569,9 +620,15 @@ object commands {
}

def withOutput(fmt: format.Format) = this.copy(output = fmt)

def withNamespace(namespace: String) = this.copy(namespace = Some(namespace))
}

case class Scale(cloudflowApp: String = "", scales: Map[String, Int] = Map(), output: format.Format = format.Default)
case class Scale(
cloudflowApp: String = "",
namespace: Option[String] = None,
scales: Map[String, Int] = Map(),
output: format.Format = format.Default)
extends Command[ScaleResult] {

def execution(kubeClient: => KubeClient, logger: CliLogger): Execution[ScaleResult] = {
Expand All @@ -583,6 +640,8 @@ object commands {
}

def withOutput(fmt: format.Format) = this.copy(output = fmt)

def withNamespace(namespace: String) = this.copy(namespace = Some(namespace))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ final case class ConfigurationExecution(c: Configuration, client: KubeClient, lo
logger.info("Executing command Configuration")
for {
_ <- validateProtocolVersion(client)
res <- client.getAppInputSecret(c.cloudflowApp)
res <- client.getAppInputSecret(c.cloudflowApp, c.namespace.getOrElse(c.cloudflowApp))
config <- Try { ConfigFactory.parseString(res) }.recover {
case ex => throw CliException("Failed to parse the current configuration", ex)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ final case class ConfigureExecution(c: Configure, client: KubeClient, logger: Cl
logger.info("Executing command Configure")
for {
_ <- validateProtocolVersion(client)
namespace = c.namespace.getOrElse(c.cloudflowApp)

currentCr <- client.readCloudflowApp(c.cloudflowApp).map {
currentCr <- client.readCloudflowApp(c.cloudflowApp, namespace).map {
_.getOrElse(throw CliException(s"Cloudflow application ${c.cloudflowApp} not found in the cluster"))
}

Expand All @@ -38,8 +39,14 @@ final case class ConfigureExecution(c: Configure, client: KubeClient, logger: Cl
c.microservices,
() => client.getKafkaClusters(None).map(parseValues))

uid <- client.uidCloudflowApp(currentCr.spec.appId)
_ <- client.configureCloudflowApp(currentCr.spec.appId, uid, configStr, logbackContent, streamletsConfigs)
uid <- client.uidCloudflowApp(currentCr.spec.appId, namespace)
_ <- client.configureCloudflowApp(
currentCr.spec.appId,
namespace,
uid,
configStr,
logbackContent,
streamletsConfigs)
} yield {
ConfigureResult()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,10 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge

// prepare the data
localApplicationCr <- loadCrFile(d.crFile)
namespace = d.namespace.getOrElse(localApplicationCr.spec.appId)

// update the replicas
currentAppCr <- client.readCloudflowApp(localApplicationCr.spec.appId)
currentAppCr <- client.readCloudflowApp(localApplicationCr.spec.appId, namespace)
clusterReplicas = getStreamletsReplicas(currentAppCr)
clusterApplicationCr <- updateReplicas(localApplicationCr, clusterReplicas)
applicationCrReplicas <- updateReplicas(clusterApplicationCr, d.scales)
Expand Down Expand Up @@ -198,7 +199,7 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge

// Operations on the cluster
name = applicationCr.spec.appId
_ <- client.createNamespace(name)
_ <- client.createNamespace(namespace)
_ <- {
if (d.noRegistryCredentials) Success(())
else {
Expand All @@ -213,13 +214,14 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
if (d.microservices) {
client.createMicroservicesApp(
applicationCr.spec,
namespace,
CloudflowToMicroservicesCR
.convert(applicationCr.spec, logbackContent.map(_ => KubeClient.LoggingSecretName)))
} else {
client.createCloudflowApp(applicationCr.spec)
client.createCloudflowApp(applicationCr.spec, namespace)
}
}
_ <- client.configureCloudflowApp(name, uid, configStr, logbackContent, streamletsConfigs)
_ <- client.configureCloudflowApp(name, namespace, uid, configStr, logbackContent, streamletsConfigs)
} yield {
logger.trace("Command Deploy executed successfully")
DeployResult()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ final case class ListExecution(l: List, client: KubeClient, logger: CliLogger)
logger.info("Executing command List")
for {
_ <- validateProtocolVersion(client)
res <- client.listCloudflowApps()
res <- client.listCloudflowApps(l.namespace)
} yield {
ListResult(res)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ final case class ScaleExecution(s: Scale, client: KubeClient, logger: CliLogger)
logger.info("Executing command Status")
for {
_ <- validateProtocolVersion(client)
namespace = s.namespace.getOrElse(s.cloudflowApp)

currentAppCrOpt <- client.readCloudflowApp(s.cloudflowApp)
currentAppCrOpt <- client.readCloudflowApp(s.cloudflowApp, namespace)

currentAppCr = currentAppCrOpt.getOrElse(throw CliException(s"Application ${s.cloudflowApp} not found"))
applicationCr <- updateReplicas(currentAppCr, s.scales)

_ <- client.updateCloudflowApp(applicationCr)
_ <- client.updateCloudflowApp(applicationCr, namespace)
} yield {
ScaleResult()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ final case class StatusExecution(s: Status, client: KubeClient, logger: CliLogge
logger.info("Executing command Status")
for {
_ <- validateProtocolVersion(client)
res <- client.getCloudflowAppStatus(s.cloudflowApp)
res <- client.getCloudflowAppStatus(s.cloudflowApp, s.namespace.getOrElse(s.cloudflowApp))
} yield {
StatusResult(res)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ final case class UndeployExecution(u: Undeploy, client: KubeClient, logger: CliL
logger.info("Executing command Undeploy")
for {
_ <- validateProtocolVersion(client)
_ <- client.deleteCloudflowApp(u.cloudflowApp)
_ <- client.deleteCloudflowApp(u.cloudflowApp, u.namespace.getOrElse(u.cloudflowApp))
} yield {
UndeployResult()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ final case class UpdateCredentialsExecution(u: UpdateCredentials, client: KubeCl
logger.info("Executing command UpdateCredentials")
for {
_ <- validateProtocolVersion(client)
_ <- client.createNamespace(u.cloudflowApp)
namespace = u.namespace.getOrElse(u.cloudflowApp)
_ <- client.createNamespace(namespace)
_ <- client.createImagePullSecret(
namespace = u.cloudflowApp,
namespace = namespace,
dockerRegistryURL = u.dockerRegistry,
dockerUsername = u.username,
dockerPassword = u.password)
Expand Down
Loading

0 comments on commit c7e2391

Please sign in to comment.