Skip to content

Commit

Permalink
operator: reconcile sink webhook
Browse files Browse the repository at this point in the history
**Summary**

Add the handler to reconcile webhook sinks by creating a pod with all
the command line flags (from environment) configured.

**Test Plan**

Use the manifests in `operator/examples` to test that the webhook
resources are created.
  • Loading branch information
fracek committed Jul 9, 2023
1 parent dff0d06 commit 65d4825
Show file tree
Hide file tree
Showing 21 changed files with 758 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ trim_trailing_whitespace = false
[*.nix]
indent_size = 2

[*.yml]
[*.{yml,yaml}]
indent_size = 2

[*.{json,js,ts}]
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions docs/integrations/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ Notice that some integrations (for example, Parquet) are limited to finalized bl

These options are used to specify a data filter and transformation function.

- `--filter`: the JSON-encoded filter to use. If it starts with `@`, it's
interpreted as a path to a file.
- `--filter`: path to a JSON-encoded filter to use.
- `--transform`: path to a Javascript or Typescript file that contains the
transformation function.
- `--env-file`: a file containing the environment variables to load and expose
Expand Down
1 change: 1 addition & 0 deletions operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ path = "src/bin.rs"
anyhow.workspace = true
apibara-observability = { path = "../observability" }
clap.workspace = true
futures.workspace = true
k8s-openapi = { version = "0.18.0", features = ["v1_26", "api", "schemars"] }
kube = { version = "0.83.0", features = ["client", "derive", "runtime", "rustls-tls"], default-features = false }
schemars = "0.8.12"
Expand Down
23 changes: 23 additions & 0 deletions operator/examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Apibara Operator Examples

This folder contains example manifests that you can use to test your Apibara
Operator installation.


### Getting Started

If you're deploying integrations that use data from the hosted streams, you
must configure your API Key in `apikey.yaml`.

Change the value of the `production` key to your key, then deploy it with:

```sh
kubectl apply -f apikey.yaml
```


### Content Structure

- `config.yaml`: contains a `ConfigMap` with a filter and transform for the
AVNU exchange on Starknet Goerli.
- `webhook.yaml`: deploys a webhook integration that streams AVNU data to [/dev/null as a Service](https://devnull-as-a-service.com/code/).
8 changes: 8 additions & 0 deletions operator/examples/apikey.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: v1
kind: Secret
metadata:
namespace: default
name: apibara-api-key
stringData:
production: dna_XXX

23 changes: 23 additions & 0 deletions operator/examples/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
apiVersion: v1
kind: ConfigMap
metadata:
namespace: default
name: avnu-swaps
data:
# AVNU exchange swap events, on Starknet Goerli testnet.
filter.json: |
{
"header": { "weak": true },
"events": [
{
"fromAddress": "0x06d8cd321dcbbf54512eab67c8a6849faf920077a3996f40bb4761adc4f021d2",
"keys": ["0xe316f0d9d2a3affa97de1d99bb2aac0538e2666d0d8545545ead241ef0ccab"]
}
]
}
transform.js: |
// Return the batch as is
export default function transform(batch) {
return batch;
}
28 changes: 28 additions & 0 deletions operator/examples/webhook.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: apibara.com/v1alpha1
kind: SinkWebhook
metadata:
namespace: default
name: webhook-example
spec:
targetUrl: https://devnull-as-a-service.com/dev/null
headers:
- name: x-my-header
value: "test value"
stream:
filter:
configMapKeyRef:
key: filter.json
name: avnu-swaps
streamUrl: https://goerli.starknet.a5a.ch
authToken:
valueFrom:
secretKeyRef:
name: apibara-api-key
key: production
network: starknet
finality: accepted
startingBlock: 781300
inheritedMetadata:
annotations:
"sidecar.opentelemetry.io/inject": true

32 changes: 28 additions & 4 deletions operator/src/bin.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use apibara_observability::init_opentelemetry;
use apibara_operator::sink::SinkWebhook;
use apibara_operator::{
configuration::{Configuration, SinkWebhookConfiguration},
controller,
sink::SinkWebhook,
};
use clap::{Args, Parser, Subcommand};
use kube::CustomResourceExt;
use kube::{Client, CustomResourceExt};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand All @@ -22,7 +26,11 @@ enum Command {
struct GenerateCrdArgs {}

#[derive(Args, Debug)]
struct StartArgs {}
struct StartArgs {
/// The default image to use for the webhook sink.
#[arg(long, env)]
pub sink_webhook_image: Option<String>,
}

fn generate_crds(_args: GenerateCrdArgs) -> anyhow::Result<()> {
let crds = [SinkWebhook::crd()]
Expand All @@ -34,7 +42,10 @@ fn generate_crds(_args: GenerateCrdArgs) -> anyhow::Result<()> {
Ok(())
}

async fn start(_args: StartArgs) -> anyhow::Result<()> {
async fn start(args: StartArgs) -> anyhow::Result<()> {
let client = Client::try_default().await?;
let configuration = args.to_configuration();
controller::start(client, configuration).await?;
Ok(())
}

Expand All @@ -50,3 +61,16 @@ async fn main() -> anyhow::Result<()> {

Ok(())
}

impl StartArgs {
pub fn to_configuration(&self) -> Configuration {
let webhook = SinkWebhookConfiguration {
image: self
.sink_webhook_image
.clone()
.unwrap_or_else(|| "quay.io/apibara/sink-webhook:latest".to_string()),
};

Configuration { webhook }
}
}
10 changes: 10 additions & 0 deletions operator/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#[derive(Debug, Clone)]
pub struct Configuration {
pub webhook: SinkWebhookConfiguration,
}

#[derive(Debug, Clone)]
pub struct SinkWebhookConfiguration {
/// The image name to use for the webhook container.
pub image: String,
}
51 changes: 51 additions & 0 deletions operator/src/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::fmt::Debug;

use futures::{Future, Stream, StreamExt};
use kube::{
core::Resource,
runtime::{
controller::{Action, Error as ControllerError},
reflector::ObjectRef,
watcher::Error as WatcherError,
},
Client,
};
use tracing::{info, warn};

use crate::{
configuration::Configuration,
reconcile::{Context, Error},
sink::webhook,
};

pub type ReconcileItem<K> = Result<(ObjectRef<K>, Action), ControllerError<Error, WatcherError>>;

pub async fn start(client: Client, configuration: Configuration) -> Result<(), Error> {
info!("controller started");

let ctx = Context {
client,
configuration,
};
let webhook_controller = webhook::start_controller(ctx.clone()).await?;

run_controller_to_end(webhook_controller).await;

info!("controller terminated");
Ok(())
}

fn run_controller_to_end<K>(
controller_stream: impl Stream<Item = ReconcileItem<K>>,
) -> impl Future<Output = ()>
where
K: Resource + Debug,
<K as Resource>::DynamicType: Debug,
{
controller_stream.for_each(|res| async move {
match res {
Ok((obj, action)) => info!(obj = ?obj, action = ?action, "reconcile success"),
Err(err) => warn!(err = ?err, "reconcile failed"),
}
})
}
3 changes: 3 additions & 0 deletions operator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
pub mod configuration;
pub mod controller;
pub mod reconcile;
pub mod sink;
30 changes: 30 additions & 0 deletions operator/src/reconcile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use kube::{
runtime::{
controller::{Action, Error as ControllerError},
reflector::ObjectRef,
watcher::Error as WatcherError,
},
Client,
};

use crate::configuration::Configuration;

#[derive(Clone)]
pub struct Context {
/// Kube client.
pub client: Client,
/// Operator configuration.
pub configuration: Configuration,
}

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("CRD not installed: {0}")]
CrdNotInstalled(String),
#[error("Finalizer error: {0}")]
Finalizer(#[source] Box<kube::runtime::finalizer::Error<Error>>),
#[error("Kube error: {0}")]
Kube(#[from] kube::Error),
}

pub type ReconcileItem<K> = Result<(ObjectRef<K>, Action), ControllerError<Error, WatcherError>>;
Loading

0 comments on commit 65d4825

Please sign in to comment.