Skip to content

Commit

Permalink
Merge pull request #3 from estuary/phil/quieter-logging
Browse files Browse the repository at this point in the history
update proto-flow dependency, and add missing snapshots
  • Loading branch information
psFried authored Jan 5, 2024
2 parents 4452fe9 + c448bae commit 5911974
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 53 deletions.
78 changes: 44 additions & 34 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 16 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ pub mod pull;
pub mod state;
pub mod transactor;

use std::mem;

use self::fetcher::Fetcher;
use anyhow::Context;

Expand Down Expand Up @@ -59,11 +61,16 @@ pub async fn run_connector(
if let Some(_) = spec {
return do_spec(stdout).await;
}
if let Some(discover_req) = discover {
return do_discover(discover_req.config_json, stdout).await;
if let Some(mut discover_req) = discover {
return do_discover(mem::take(&mut discover_req.config_json), stdout).await;
}
if let Some(validate_req) = validate {
return do_validate(validate_req.config_json, validate_req.bindings, stdout).await;
if let Some(mut validate_req) = validate {
return do_validate(
mem::take(&mut validate_req.config_json),
mem::take(&mut validate_req.bindings),
stdout,
)
.await;
}
if let Some(_) = apply {
return do_apply(stdout).await;
Expand Down Expand Up @@ -98,8 +105,9 @@ async fn do_spec(mut stdout: io::Stdout) -> anyhow::Result<()> {
protocol: 3032023,
config_schema_json,
resource_config_schema_json,
documentation_url: "https://go.estuary.dev/http-ingest".to_string(),
documentation_url: "https://go.estuary.dev/placeholder".to_string(),
oauth2: None,
resource_path_pointers: vec!["/creatorId".to_string()],
}),
..Default::default()
};
Expand Down Expand Up @@ -139,12 +147,8 @@ async fn do_validate(
.context("failed to connect to ballchasing api")?;
tracing::info!(?ping_response, "successfully pinged the ballchasing API");
let mut output = Vec::with_capacity(bindings.len());
for ValidateBinding {
collection: _,
resource_config_json,
} in bindings
{
let resource_config = serde_json::from_str::<ResourceConfig>(&resource_config_json)
for binding in bindings {
let resource_config = serde_json::from_str::<ResourceConfig>(&binding.resource_config_json)
.context("deserializing resource config")?;

let groups = fetcher
Expand Down Expand Up @@ -233,6 +237,7 @@ fn discovered_collection(steam_id: String) -> DiscoveredBinding {
}))
.unwrap(),
key: vec!["/id".to_string()],
resource_path: Vec::new(), // resource_path is deprecated and unused
}
}

Expand Down
13 changes: 5 additions & 8 deletions src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use std::collections::BTreeMap;

use crate::transactor::Emitter;
use anyhow::Context;
use proto_flow::{
capture::{request::Open, response::Opened, Response},
flow::CaptureSpec,
};
use proto_flow::capture::{request::Open, response::Opened, Response};
use serde::{Deserialize, Serialize};

use time::OffsetDateTime;
Expand All @@ -32,11 +29,11 @@ pub async fn do_pull(
mut stdout: io::Stdout,
) -> anyhow::Result<()> {
tracing::info!("starting to pull");
let Some(CaptureSpec { config_json, bindings, .. }) = capture else {
let Some(spec) = capture else {
anyhow::bail!("open request is missing capture spec");
};

let config = serde_json::from_str::<EndpointConfig>(&config_json)
let config = serde_json::from_str::<EndpointConfig>(&spec.config_json)
.context("deserializing endpoint config")?;

let fetcher = Fetcher::new(config.auth_token);
Expand All @@ -48,7 +45,7 @@ pub async fn do_pull(
};

let mut binding_indices = BTreeMap::new();
for (i, binding) in bindings.iter().enumerate() {
for (i, binding) in spec.bindings.iter().enumerate() {
let collection_name = binding
.collection
.as_ref()
Expand Down Expand Up @@ -190,7 +187,7 @@ async fn next_replays(

todo_groups.retain(|g| !g.is_done());

let Some(grp) = todo_groups.front_mut() else {
let Some(grp) = todo_groups.front_mut() else {
return Ok(None);
};

Expand Down
18 changes: 18 additions & 0 deletions src/snapshots/rl_stats_ingester__test__endpoint_config_schema.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
source: src/lib.rs
expression: schema
---
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"title": "EndpointConfig",
"type": "object",
"required": [
"authToken"
],
"properties": {
"authToken": {
"description": "Authentication token for the ballchasing api.\n\nIf you don't have one, get one by visiting: https://ballchasing.com/login",
"type": "string"
}
}
}
18 changes: 18 additions & 0 deletions src/snapshots/rl_stats_ingester__test__resource_config_schema.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
source: src/lib.rs
expression: schema
---
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"title": "ResourceConfig",
"type": "object",
"required": [
"creatorId"
],
"properties": {
"creatorId": {
"description": "The creator id to filter replays in ballchasing. Only replays in groups for this creator will be ingested.",
"type": "string"
}
}
}

0 comments on commit 5911974

Please sign in to comment.