Skip to content

Commit

Permalink
Add more commands for dash cli
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed May 18, 2023
1 parent d050bdc commit d214e52
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 10 deletions.
60 changes: 50 additions & 10 deletions dash/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::env;
use std::{env, future::Future};

use anyhow::Result;
use ark_core::logger;
use clap::{value_parser, ArgAction, Parser, Subcommand};
use dash_provider::{client::FunctionSession, input::InputFieldString};
use dash_provider_api::{SessionContextMetadata, SessionResult};
use kube::Client;
use serde::Serialize;
use serde_json::Value;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
Expand All @@ -18,7 +20,7 @@ struct Args {
}

impl Args {
async fn run(self) -> SessionResult {
async fn run(self) -> SessionResult<Value> {
match self.common.run() {
Ok(()) => self.command.run().await,
Err(e) => Err(e).into(),
Expand Down Expand Up @@ -57,41 +59,79 @@ impl ArgsCommon {

#[derive(Subcommand)]
enum Commands {
Create(CommandCreate),
Create(CommandSession),
Delete(CommandSession),
Exists(CommandSession),
}

impl Commands {
async fn run(self) -> SessionResult {
async fn run(self) -> SessionResult<Value> {
let kube = match Client::try_default().await {
Ok(kube) => kube,
Err(e) => return Err(e).into(),
};

match self {
Self::Create(command) => command.run(kube).await,
Self::Create(command) => command.create(kube).await,
Self::Delete(command) => command.delete(kube).await,
Self::Exists(command) => command.exists(kube).await,
}
}
}

/// Create a resource from a file or from stdin.
#[derive(Parser)]
struct CommandCreate {
struct CommandSession {
/// Set a function name
#[arg(short, long, env = "DASH_FUNCTION", value_name = "NAME")]
function: String,

/// Set values by manual
#[arg(short = 'v', long = "value")]
inputs: Vec<InputFieldString>,

/// Set a target namespace
#[arg(long, env = "DASH_NAMESPACE", value_name = "NAMESPACE")]
namespace: Option<String>,
}

impl CommandCreate {
async fn run(self, kube: Client) -> SessionResult {
impl CommandSession {
async fn run<F, Fut, R>(self, kube: Client, f: F) -> SessionResult<Value>
where
F: FnOnce(Client, SessionContextMetadata, Vec<InputFieldString>) -> Fut,
Fut: Future<Output = SessionResult<R>>,
R: Serialize,
{
let metadata = SessionContextMetadata {
name: self.function,
namespace: kube.default_namespace().to_string(),
namespace: self
.namespace
.unwrap_or_else(|| kube.default_namespace().to_string()),
};
FunctionSession::create_raw(kube, &metadata, self.inputs).await
f(kube, metadata, self.inputs)
.await
.and_then(::serde_json::to_value)
}

async fn create(self, kube: Client) -> SessionResult<Value> {
self.run(kube, |kube, metadata, inputs| async move {
FunctionSession::create_raw(kube, &metadata, inputs).await
})
.await
}

async fn delete(self, kube: Client) -> SessionResult<Value> {
self.run(kube, |kube, metadata, inputs| async move {
FunctionSession::create_raw(kube, &metadata, inputs).await
})
.await
}

async fn exists(self, kube: Client) -> SessionResult<Value> {
self.run(kube, |kube, metadata, inputs| async move {
FunctionSession::exists_raw(kube, &metadata, inputs).await
})
.await
}
}

Expand Down
13 changes: 13 additions & 0 deletions dash/provider/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ where
}
}

impl<T> SessionResult<T> {
pub fn and_then<F, T2, E>(self, f: F) -> SessionResult<T2>
where
F: FnOnce(T) -> Result<T2, E>,
E: ToString,
{
match self {
Self::Ok(e) => f(e).into(),
Self::Err(e) => SessionResult::Err(e),
}
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FunctionChannel {
Expand Down
91 changes: 91 additions & 0 deletions dash/provider/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,38 @@ impl<'a> FunctionSession<'a> {
Ok(())
}

pub async fn exists_raw<Value>(
kube: Client,
metadata: &'a SessionContextMetadata,
inputs: Vec<InputField<Value>>,
) -> SessionResult<bool>
where
Self: FunctionSessionUpdateFields<Value>,
{
Self::load(kube, metadata)
.and_then(|session| session.try_exists_raw(inputs))
.await
.into()
}

async fn try_exists_raw<Value>(mut self, inputs: Vec<InputField<Value>>) -> Result<bool>
where
Self: FunctionSessionUpdateFields<Value>,
{
let input = SessionContext {
metadata: self.metadata.clone(),
spec: {
self.update_fields(inputs).await?;
self.input.finalize()?
},
};

self.client
.exists_raw(&input)
.await
.map_err(|e| anyhow!("failed to check function {:?}: {e}", &self.metadata.name))
}

pub async fn create_raw<Value>(
kube: Client,
metadata: &'a SessionContextMetadata,
Expand Down Expand Up @@ -130,6 +162,41 @@ impl<'a> FunctionSession<'a> {
.await
.map_err(|e| anyhow!("failed to create function {:?}: {e}", &self.metadata.name))
}

pub async fn delete_raw<Value>(
kube: Client,
metadata: &'a SessionContextMetadata,
inputs: Vec<InputField<Value>>,
) -> SessionResult
where
Self: FunctionSessionUpdateFields<Value>,
{
Self::load(kube, metadata)
.and_then(|session| session.try_delete_raw(inputs))
.await
.into()
}

async fn try_delete_raw<Value>(
mut self,
inputs: Vec<InputField<Value>>,
) -> Result<FunctionChannel>
where
Self: FunctionSessionUpdateFields<Value>,
{
let input = SessionContext {
metadata: self.metadata.clone(),
spec: {
self.update_fields(inputs).await?;
self.input.finalize()?
},
};

self.client
.delete_raw(&input)
.await
.map_err(|e| anyhow!("failed to delete function {:?}: {e}", &self.metadata.name))
}
}

pub enum FunctionActorClient {
Expand All @@ -152,6 +219,15 @@ impl FunctionActorClient {
}
}

pub async fn exists_raw<Spec>(&self, input: &SessionContext<Spec>) -> Result<bool>
where
Spec: Serialize,
{
match self {
Self::Job(client) => client.exists_raw(input).await,
}
}

pub async fn create_raw<Spec>(&self, input: &SessionContext<Spec>) -> Result<FunctionChannel>
where
Spec: Serialize,
Expand All @@ -166,4 +242,19 @@ impl FunctionActorClient {
},
})
}

pub async fn delete_raw<Spec>(&self, input: &SessionContext<Spec>) -> Result<FunctionChannel>
where
Spec: Serialize,
{
Ok(FunctionChannel {
metadata: input.metadata.clone(),
actor: match self {
Self::Job(client) => client
.delete_raw(input)
.await
.map(FunctionChannelKind::Job)?,
},
})
}
}

0 comments on commit d214e52

Please sign in to comment.