Skip to content

Commit

Permalink
operator: add SinkWebhook CRDs
Browse files Browse the repository at this point in the history
  • Loading branch information
fracek committed Jul 9, 2023
1 parent 57672ff commit dff0d06
Show file tree
Hide file tree
Showing 8 changed files with 673 additions and 16 deletions.
404 changes: 388 additions & 16 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ members = [
"sink-mongo",
"sink-parquet",
"sink-postgres",
"operator",
"examples/starknet-simple",
]

Expand Down
27 changes: 27 additions & 0 deletions operator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "apibara-operator"
version = "0.1.0"
edition.workspace = true

[lib]
name = "apibara_operator"
path = "src/lib.rs"

[[bin]]
name = "apibara-operator"
path = "src/bin.rs"

[dependencies]
anyhow.workspace = true
apibara-observability = { path = "../observability" }
clap.workspace = true
k8s-openapi = { version = "0.18.0", features = ["v1_26", "api", "schemars"] }
kube = { version = "0.83.0", features = ["client", "derive", "runtime", "rustls-tls"], default-features = false }
schemars = "0.8.12"
serde.workspace = true
serde_json.workspace = true
serde_yaml = "0.9.22"
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
52 changes: 52 additions & 0 deletions operator/src/bin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use apibara_observability::init_opentelemetry;
use apibara_operator::sink::SinkWebhook;
use clap::{Args, Parser, Subcommand};
use kube::CustomResourceExt;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Cli {
#[command(subcommand)]
command: Command,
}

#[derive(Subcommand, Debug)]
enum Command {
/// Generate the operator CRDs and exit.
GenerateCrd(GenerateCrdArgs),
/// Start the operator.
Start(StartArgs),
}

#[derive(Args, Debug)]
struct GenerateCrdArgs {}

#[derive(Args, Debug)]
struct StartArgs {}

fn generate_crds(_args: GenerateCrdArgs) -> anyhow::Result<()> {
let crds = [SinkWebhook::crd()]
.iter()
.map(|crd| serde_yaml::to_string(&crd))
.collect::<Result<Vec<_>, _>>()?
.join("---\n");
println!("{}", crds);
Ok(())
}

async fn start(_args: StartArgs) -> anyhow::Result<()> {
Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
init_opentelemetry()?;
let args = Cli::parse();

match args.command {
Command::GenerateCrd(args) => generate_crds(args)?,
Command::Start(args) => start(args).await?,
}

Ok(())
}
1 change: 1 addition & 0 deletions operator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod sink;
152 changes: 152 additions & 0 deletions operator/src/sink/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use std::collections::BTreeMap;

use k8s_openapi::{
api::core::v1::{ConfigMapKeySelector, EnvVarSource, LocalObjectReference, SecretKeySelector},
apimachinery::pkg::apis::meta::v1::Condition,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

pub type MetadataValueSource = EnvVarSource;

/// Common configuration between all sinks.
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct CommonSpec {
pub persistence: Option<PersistenceSpec>,
pub stream: StreamSpec,
pub image: Option<ImageSpec>,
pub inherited_metadata: Option<InheritedMetadataSpec>,
pub log_level: Option<LogLevel>,
}

/// Configure the image used to run the sink.
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct ImageSpec {
/// Name of the container image to run.
pub name: Option<String>,
/// Pull policy for the container image. Onef of `Always`, `Never` or `IfNotPresent`.
///
/// More info: https://kubernetes.io/docs/concepts/containers/images#updating-images
pub pull_policy: Option<String>,
/// List of references to secrets in the same namespace to use for pulling any of the images.
///
/// More info: https://kubernetes.io/docs/concepts/containers/images#specifying-imagepullsecrets-on-a-pod
pub pull_secrets: Option<Vec<LocalObjectReference>>,
}

/// Metadata that will be inherited by all resources created by the sink.
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct InheritedMetadataSpec {
/// Annotations to add to the resources.
pub annotations: Option<BTreeMap<String, String>>,
/// Labels to add to the resources.
pub labels: Option<BTreeMap<String, String>>,
}

/// Persist the sink state to etcd.
///
/// The persistence layer is also used to ensure only one instance of the sink is running at the
/// time.
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct PersistenceSpec {
/// Etcd cluster connection url.
pub persist_to_etcd: String,
/// Unique sink id.
pub sink_id: String,
}

/// DNA stream configuration.
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct StreamSpec {
/// URL of the stream. Must start with `http` or `https`.
pub stream_url: String,
/// Filter to select which data to stream.
pub filter: Option<FilterSource>,
/// Transform script to change the shape of the data.
pub transform: Option<TransformSource>,
/// The type of network.
pub network: NetworkTypeSpec,
/// Number of blocks in each historical batch.
pub batch_size: Option<u64>,
/// Maximum message size.
///
/// Accepts size in human readable form, e.g. 1kb, 1MB, 1GB.
pub max_message_size: Option<String>,
/// Data finality.
pub finality: Option<FinalitySpec>,
/// Start streaming data from a specific block.
pub starting_block: Option<u64>,
/// Bearer token used to authenticate with the stream.
pub auth_token: Option<String>,
/// Metadata to add to the stream.
pub metadata: Option<Vec<MetadataSpec>>,
}

/// Use a filter to select which data to stream.
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct FilterSource {
/// Filter from ConfigMap.
pub config_map_key_ref: Option<ConfigMapKeySelector>,
/// Filter from Secret.
pub secret_key_ref: Option<SecretKeySelector>,
}

/// Use a transform script to change the shape of the data.
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct TransformSource {
/// Transform from ConfigMap.
pub config_map_key_ref: Option<ConfigMapKeySelector>,
/// Transform from Secret.
pub secret_key_ref: Option<SecretKeySelector>,
}

/// Data finality.
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum FinalitySpec {
/// Stream finalized blocks.
Finalized,
/// Stream finalized and accepted blocks.
Accepted,
/// Stream finalized, accepted and pending blocks.
Pending,
}

/// Type of network.
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum NetworkTypeSpec {
/// Starknet L2 and appchains.
Starknet,
}

/// Log level for the containers.
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum LogLevel {
Error,
Warning,
Info,
Debug,
Trace,
}

#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct MetadataSpec {
/// The metadata name.
pub name: String,
/// The metadata value.
pub value: Option<String>,
/// Source for the metadata value.
pub value_from: Option<MetadataValueSource>,
}

#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema, PartialEq)]
pub struct CommonStatus {
/// Conditions for the sink object.
pub conditions: Option<Vec<Condition>>,
/// The name of the container running the sink.
pub instance_name: Option<String>,
/// Current phase of the sink.
pub phase: Option<String>,
}
4 changes: 4 additions & 0 deletions operator/src/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod common;
mod webhook;

pub use webhook::{SinkWebhook, SinkWebhookSpec, SinkWebhookStatus};
48 changes: 48 additions & 0 deletions operator/src/sink/webhook.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use k8s_openapi::api::core::v1::EnvVarSource;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use super::common::{CommonSpec, CommonStatus};

pub type HeaderValueSource = EnvVarSource;

/// Run a sink that invokes a webhook for each batch of data.
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
kind = "SinkWebhook",
group = "apibara.com",
version = "v1alpha1",
namespaced,
printcolumn = r#"{"name": "Age", "type": "date", "jsonPath": ".metadata.creationTimestamp" }"#,
printcolumn = r#"{"name": "Status", "type": "string", "jsonPath": ".status.phase" }"#,
printcolumn = r#"{"name": "Instance", "type": "string", "jsonPath": ".status.instanceName" }"#
)]
#[kube(status = "SinkWebhookStatus", shortname = "sinkwebhook")]
#[serde(rename_all = "camelCase")]
pub struct SinkWebhookSpec {
#[serde(flatten)]
pub common: CommonSpec,
/// The target url to send the request to.
pub target_url: String,
/// Additional headers to send with the request.
pub headers: Option<Vec<HeaderSpec>>,
}

#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema, PartialEq)]
pub struct HeaderSpec {
/// The header name.
pub name: String,
/// The header value.
pub value: Option<String>,
/// Source for the header value.
pub value_from: Option<HeaderValueSource>,
}

/// Most recent status of the webhook sink.
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct SinkWebhookStatus {
#[serde(flatten)]
pub common: CommonStatus,
}

0 comments on commit dff0d06

Please sign in to comment.