Skip to content

Commit

Permalink
resolves #2
Browse files Browse the repository at this point in the history
  • Loading branch information
Romil Punetha committed Nov 24, 2022
1 parent 0e35573 commit f87dec3
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 26 deletions.
3 changes: 2 additions & 1 deletion eventscope/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ actix-web = "4.2.1"
serde = { version = "1.0.147", features = ["derive"] }
serde_json = "1.0.88"
clickhouse-rs = "1.0.0-alpha.1"
time = { version = "0.3.17", features = ["serde", "serde-well-known"] }
time = { version = "0.3.17", features = ["serde-well-known"] }
uuid = { version = "1.1.2", features = ["v4", "serde"] }
big_s = "1.0.2"
flatten-serde-json = "0.1.0"
rdkafka = "0.29.0"
clickhouse = { path = "../../clickhouse.rs", features = ["uuid", "time"]}
61 changes: 36 additions & 25 deletions eventscope/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use actix_web::cookie::time;
use actix_web::http::header::ContentType;
use actix_web::web::Json;
use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder};
use clickhouse::{Client, Row};
use flatten_serde_json::flatten;
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use serde::{Deserialize, Serialize};
use serde_json::{Number, Value};
use serde_json::Value;
use std::time::Duration;
use time::OffsetDateTime;
use uuid::Uuid;
Expand All @@ -21,46 +22,43 @@ pub struct EventInput {
timestamp: Option<OffsetDateTime>,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Row)]
pub struct Event {
#[serde(deserialize_with = "clickhouse::serde::uuid::deserialize")]
uuid: Uuid,
event_name: String,
// _namespace: String,
_source: String,
#[serde(with = "time::serde::rfc3339")]
_timestamp: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
_created_at: OffsetDateTime,
source: String,
#[serde(with = "clickhouse::serde::time::datetime64::micros")]
timestamp: OffsetDateTime,
#[serde(with = "clickhouse::serde::time::datetime64::micros")]
created_at: OffsetDateTime,
string_names: Vec<String>,
string_values: Vec<String>,
number_names: Vec<String>,
number_values: Vec<Number>,
number_values: Vec<f64>,
bool_names: Vec<String>,
bool_values: Vec<bool>,
array_names: Vec<String>,
array_values: Vec<Vec<String>>,
// team_id: String,
// distinct_id: String,
// created_at: OffsetDateTime,
// person_id: String,
// person_created_at: OffsetDateTime,
// person_properties: String,
}

impl Event {
pub fn new(event_input: &EventInput) -> Event {
let flattened_properties = flatten(&std::mem::take(
event_input.clone().properties.as_object_mut().unwrap(),
));
let now = OffsetDateTime::now_utc();

let mut event = Event {
uuid: event_input.uuid,
event_name: event_input.event_name.clone(),
_source: event_input.properties.to_string(),
_timestamp: match event_input.timestamp {
source: event_input.properties.to_string(),
timestamp: match event_input.timestamp {
Some(timestamp) => timestamp,
None => OffsetDateTime::now_utc(),
None => now,
},
_created_at: OffsetDateTime::now_utc(),
created_at: now,
string_names: vec![],
string_values: vec![],
number_names: vec![],
Expand All @@ -78,7 +76,7 @@ impl Event {
}
Value::Number(value) => {
event.number_names.push(key.to_owned());
event.number_values.push(value.clone());
event.number_values.push(value.as_f64().unwrap());
}
Value::String(value) => {
event.string_names.push(key.to_owned());
Expand All @@ -104,8 +102,21 @@ impl Event {
}

#[get("/")]
async fn hello() -> impl Responder {
HttpResponse::Ok().body("Hello world!")
async fn get_events() -> impl Responder {
let client = Client::default()
.with_url("http://localhost:8123")
.with_database("default");

let mut cursor = client
.query("SELECT ?fields FROM default.Events")
.fetch::<Event>()
.unwrap();

while let Some(row) = cursor.next().await.unwrap() {
println!("{:#?}", row);
}

HttpResponse::Ok().body("hello")
}

#[post("/echo")]
Expand All @@ -123,9 +134,9 @@ async fn index(event_input: Json<EventInput>) -> HttpResponse {
.create()
.expect("Producer creation error");

let event_payload = serde_json::to_string(&event).unwrap();
let event_payload = serde_json::to_string_pretty(&event).unwrap();
let event_key = event.uuid.to_string();
let event_record = FutureRecord::to("EventInput")
let event_record = FutureRecord::to("Events")
.payload(&event_payload)
.key(&event_key);

Expand All @@ -146,7 +157,7 @@ async fn index(event_input: Json<EventInput>) -> HttpResponse {

HttpResponse::Ok()
.content_type(ContentType::json())
.body(format!("{:#?}", event))
.body(serde_json::to_string(&event).unwrap())
}

async fn manual_hello() -> impl Responder {
Expand All @@ -157,7 +168,7 @@ async fn manual_hello() -> impl Responder {
async fn main() -> std::io::Result<()> {
HttpServer::new(|| {
App::new()
.service(hello)
.service(get_events)
.service(echo)
.service(index)
.route("/hey", web::get().to(manual_hello))
Expand Down

0 comments on commit f87dec3

Please sign in to comment.