From 4b00d17426b5f7774dafae2d5dc2ecece3ed2887 Mon Sep 17 00:00:00 2001 From: Phil Date: Tue, 12 Nov 2024 16:23:00 -0500 Subject: [PATCH] agent: refactor Connectors to DiscoverConnectors Refactors the `Connectors` trait to be more high-level and specific to Discovers. Also refactors the `MockConnectors` that are used by integration tests to simplify that, since it's not a more specific trait. --- crates/agent/src/controlplane.rs | 8 +- crates/agent/src/discovers.rs | 8 +- crates/agent/src/discovers/handler.rs | 6 +- .../src/integration_tests/auto_discovers.rs | 24 ++-- crates/agent/src/integration_tests/harness.rs | 12 +- .../integration_tests/harness/connectors.rs | 124 +++++------------- .../src/integration_tests/user_discovers.rs | 6 +- crates/agent/src/lib.rs | 2 +- crates/agent/src/proxy_connectors.rs | 60 ++------- 9 files changed, 75 insertions(+), 175 deletions(-) diff --git a/crates/agent/src/controlplane.rs b/crates/agent/src/controlplane.rs index 3153004535..58e5ff7326 100644 --- a/crates/agent/src/controlplane.rs +++ b/crates/agent/src/controlplane.rs @@ -14,7 +14,7 @@ use crate::{ DefaultRetryPolicy, DraftPublication, NoopFinalize, PublicationResult, Publisher, UpdateInferredSchemas, }, - Connectors, DiscoverHandler, + DiscoverConnectors, DiscoverHandler, }; macro_rules! unwrap_single { @@ -167,7 +167,7 @@ fn set_of>(s: T) -> BTreeSet { /// Implementation of `ControlPlane` that connects directly to postgres. #[derive(Clone)] -pub struct PGControlPlane { +pub struct PGControlPlane { pub pool: sqlx::PgPool, pub system_user_id: Uuid, pub publications_handler: Publisher, @@ -175,7 +175,7 @@ pub struct PGControlPlane { pub discovers_handler: DiscoverHandler, } -impl PGControlPlane { +impl PGControlPlane { pub fn new( pool: sqlx::PgPool, system_user_id: Uuid, @@ -255,7 +255,7 @@ impl PGControlPlane { } #[async_trait::async_trait] -impl ControlPlane for PGControlPlane { +impl ControlPlane for PGControlPlane { #[tracing::instrument(level = "debug", err, skip(self))] async fn notify_dependents(&mut self, catalog_name: String) -> anyhow::Result<()> { let now = self.current_time(); diff --git a/crates/agent/src/discovers.rs b/crates/agent/src/discovers.rs index 23edd3b908..705a7489f7 100644 --- a/crates/agent/src/discovers.rs +++ b/crates/agent/src/discovers.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeMap, HashSet}; -use crate::proxy_connectors::Connectors; +use crate::proxy_connectors::DiscoverConnectors; use anyhow::Context; use models::split_image_tag; @@ -158,13 +158,13 @@ pub struct DiscoverHandler { pub connectors: C, } -impl DiscoverHandler { +impl DiscoverHandler { pub fn new(connectors: C) -> Self { Self { connectors } } } -impl DiscoverHandler { +impl DiscoverHandler { #[tracing::instrument(skip_all, fields( capture_name = %req.capture_name, data_plane_name = %req.data_plane.data_plane_name, @@ -234,7 +234,7 @@ impl DiscoverHandler { let result = self .connectors - .unary_capture(request, logs_token, task, &data_plane) + .discover(request, logs_token, task, &data_plane) .await; let response = match result { diff --git a/crates/agent/src/discovers/handler.rs b/crates/agent/src/discovers/handler.rs index cc9f51c412..e9142dd9dc 100644 --- a/crates/agent/src/discovers/handler.rs +++ b/crates/agent/src/discovers/handler.rs @@ -1,5 +1,5 @@ use super::{Discover, DiscoverHandler}; -use crate::{draft, proxy_connectors::Connectors, HandleResult, Handler, Id}; +use crate::{draft, proxy_connectors::DiscoverConnectors, HandleResult, Handler, Id}; use agent_sql::discovers::Row; use anyhow::Context; use serde::{Deserialize, Serialize}; @@ -36,7 +36,7 @@ impl JobStatus { } #[async_trait::async_trait] -impl Handler for DiscoverHandler { +impl Handler for DiscoverHandler { async fn handle( &mut self, pg_pool: &sqlx::PgPool, @@ -64,7 +64,7 @@ impl Handler for DiscoverHandler { } } -impl DiscoverHandler { +impl DiscoverHandler { #[tracing::instrument(err, skip_all, fields(id=?row.id, draft_id = ?row.draft_id, user_id = %row.user_id))] async fn process( &mut self, diff --git a/crates/agent/src/integration_tests/auto_discovers.rs b/crates/agent/src/integration_tests/auto_discovers.rs index 2baaf6dc08..2b9c0d0ea5 100644 --- a/crates/agent/src/integration_tests/auto_discovers.rs +++ b/crates/agent/src/integration_tests/auto_discovers.rs @@ -121,7 +121,7 @@ async fn test_auto_discovers_add_new_bindings() { harness .discover_handler .connectors - .mock_discover(Box::new(Ok(discovered))); + .mock_discover("marmots/capture", Ok(discovered)); tokio::time::sleep(AUTO_DISCOVER_WAIT).await; harness.run_pending_controller("marmots/capture").await; @@ -226,7 +226,7 @@ async fn test_auto_discovers_add_new_bindings() { harness .discover_handler .connectors - .mock_discover(Box::new(Ok(discovered))); + .mock_discover("marmots/capture", Ok(discovered)); tokio::time::sleep(AUTO_DISCOVER_WAIT).await; harness.run_pending_controller("marmots/capture").await; @@ -390,7 +390,7 @@ async fn test_auto_discovers_no_evolution() { draft_id, r#"{"hee": "hawwww"}"#, false, - Box::new(Ok(discovered.clone())), + Ok(discovered.clone()), ) .await; assert!(result.job_status.is_success()); @@ -418,7 +418,7 @@ async fn test_auto_discovers_no_evolution() { harness .discover_handler .connectors - .mock_discover(Box::new(Ok(new_discovered))); + .mock_discover("mules/capture", Ok(new_discovered)); harness.run_pending_controller("mules/capture").await; let capture_state = harness.get_controller_state("mules/capture").await; @@ -504,7 +504,7 @@ async fn test_auto_discovers_no_evolution() { harness .discover_handler .connectors - .mock_discover(Box::new(Ok(discovered))); + .mock_discover("mules/capture", Ok(discovered)); harness.run_pending_controller("mules/capture").await; let capture_state = harness.get_controller_state("mules/capture").await; @@ -693,7 +693,7 @@ async fn test_auto_discovers_update_only() { harness .discover_handler .connectors - .mock_discover(Box::new(Ok(discovered))); + .mock_discover("pikas/capture", Ok(discovered)); harness.run_pending_controller("pikas/capture").await; let capture_state = harness.get_controller_state("pikas/capture").await; @@ -742,10 +742,10 @@ async fn test_auto_discovers_update_only() { assert!(last_success.publish_result.is_none()); // Now simulate a discover error, and expect to see the error status reported. - harness - .discover_handler - .connectors - .mock_discover(Box::new(Err("a simulated discover error".to_string()))); + harness.discover_handler.connectors.mock_discover( + "pikas/capture", + Err("a simulated discover error".to_string()), + ); tokio::time::sleep(AUTO_DISCOVER_WAIT).await; harness.run_pending_controller("pikas/capture").await; @@ -791,7 +791,7 @@ async fn test_auto_discovers_update_only() { harness .discover_handler .connectors - .mock_discover(Box::new(Ok(discovered))); + .mock_discover("pikas/capture", Ok(discovered)); harness.control_plane().fail_next_build( "pikas/capture", InjectBuildError::new( @@ -860,7 +860,7 @@ async fn test_auto_discovers_update_only() { harness .discover_handler .connectors - .mock_discover(Box::new(Ok(discovered))); + .mock_discover("pikas/capture", Ok(discovered)); tokio::time::sleep(AUTO_DISCOVER_WAIT).await; harness.run_pending_controller("pikas/capture").await; diff --git a/crates/agent/src/integration_tests/harness.rs b/crates/agent/src/integration_tests/harness.rs index 87ecccb0ca..1d3b5131e9 100644 --- a/crates/agent/src/integration_tests/harness.rs +++ b/crates/agent/src/integration_tests/harness.rs @@ -24,7 +24,7 @@ use sqlx::types::Uuid; use tables::DraftRow; use tempfile::tempdir; -use self::connectors::MockConnectors; +use self::connectors::MockDiscoverConnectors; const FIXED_DATABASE_URL: &str = "postgresql://postgres:postgres@localhost:5432/postgres"; @@ -116,7 +116,7 @@ pub struct TestHarness { #[allow(dead_code)] // only here so we don't drop it until the harness is dropped pub builds_root: tempfile::TempDir, pub controllers: ControllerHandler, - pub discover_handler: DiscoverHandler, + pub discover_handler: DiscoverHandler, } impl TestHarness { @@ -150,7 +150,7 @@ impl TestHarness { }); let id_gen = models::IdGenerator::new(1); - let mock_connectors = connectors::MockConnectors::default(); + let mock_connectors = connectors::MockDiscoverConnectors::default(); let discover_handler = DiscoverHandler::new(mock_connectors); let publisher = Publisher::new( @@ -761,7 +761,7 @@ impl TestHarness { self.discover_handler .connectors - .mock_discover(mock_discover_resp); + .mock_discover(capture_name, mock_discover_resp); let result = self .discover_handler @@ -1065,7 +1065,7 @@ impl FailBuild for InjectBuildError { /// A wrapper around `PGControlPlane` that has a few basic capbilities for verifying /// activation calls and simulating failures of activations and publications. pub struct TestControlPlane { - inner: PGControlPlane, + inner: PGControlPlane, activations: Vec, fail_activations: BTreeSet, build_failures: InjectBuildFailures, @@ -1098,7 +1098,7 @@ impl crate::publications::FinalizeBuild for InjectBuildFailures { } impl TestControlPlane { - fn new(inner: PGControlPlane) -> Self { + fn new(inner: PGControlPlane) -> Self { Self { inner, activations: Vec::new(), diff --git a/crates/agent/src/integration_tests/harness/connectors.rs b/crates/agent/src/integration_tests/harness/connectors.rs index a184f8fadf..98f20b0883 100644 --- a/crates/agent/src/integration_tests/harness/connectors.rs +++ b/crates/agent/src/integration_tests/harness/connectors.rs @@ -1,112 +1,54 @@ -use crate::proxy_connectors::Connectors; +use crate::proxy_connectors::DiscoverConnectors; use proto_flow::capture; -use std::fmt::Debug; +use std::collections::HashMap; use std::sync::{Arc, Mutex}; -pub trait MockCall: Send + Sync + 'static { - fn call( - &self, - req: Req, - logs_token: uuid::Uuid, - task: ops::ShardRef, - data_plane: &tables::DataPlane, - ) -> anyhow::Result; -} - -impl MockCall for Result -where - Resp: Clone + Send + Sync + 'static, -{ - fn call( - &self, - _req: Req, - _logs_token: uuid::Uuid, - _task: ops::ShardRef, - _data_plane: &tables::DataPlane, - ) -> anyhow::Result { - self.clone().map_err(anyhow::Error::msg) - } -} - -struct DefaultFail; -impl MockCall for DefaultFail -where - Req: Debug, -{ - fn call( - &self, - req: Req, - _logs_token: uuid::Uuid, - _task: ops::ShardRef, - _data_plane: &tables::DataPlane, - ) -> anyhow::Result { - Err(anyhow::anyhow!("default mock failure for request: {req:?}")) - } -} - -pub type MockDiscover = - Box>; +pub type MockDiscover = Result; #[derive(Clone)] -pub struct MockConnectors { - discover: Arc>, +pub struct MockDiscoverConnectors { + mocks: Arc>>, } -impl Default for MockConnectors { +impl Default for MockDiscoverConnectors { fn default() -> Self { - MockConnectors { - discover: Arc::new(Mutex::new(Box::new(DefaultFail))), + MockDiscoverConnectors { + mocks: Arc::new(Mutex::new(HashMap::new())), } } } -impl MockConnectors { - pub fn mock_discover(&mut self, respond: MockDiscover) { - let mut lock = self.discover.lock().unwrap(); - *lock = respond; +impl MockDiscoverConnectors { + pub fn mock_discover(&mut self, capture_name: &str, respond: MockDiscover) { + let mut lock = self.mocks.lock().unwrap(); + lock.insert(models::Capture::new(capture_name), respond); } } -/// Currently, `MockConnectors` only supports capture Discover RPCs. -/// Publications do not yet use this for validate RPCs, but the plan is to do -/// that at some point, so that we can more easily test the publication logic. -impl Connectors for MockConnectors { - async fn unary_capture<'a>( +impl DiscoverConnectors for MockDiscoverConnectors { + async fn discover<'a>( &'a self, mut req: capture::Request, - logs_token: uuid::Uuid, - task: ops::ShardRef, - data_plane: &'a tables::DataPlane, - ) -> anyhow::Result { - if let Some(discover) = req.discover.take() { - let locked = self.discover.lock().unwrap(); - return locked - .call(discover, logs_token, task, data_plane) - .map(|resp| capture::Response { - discovered: Some(resp), - ..Default::default() - }); - } - Err(anyhow::anyhow!("unhandled capture request type: {req:?}")) - } - - async fn unary_derive<'a>( - &'a self, - _req: proto_flow::derive::Request, - _logs_token: uuid::Uuid, - _task: ops::ShardRef, - _data_plane: &'a tables::DataPlane, - ) -> anyhow::Result { - unimplemented!("mock connectors do not yet handle unary_derive calls"); - } - - async fn unary_materialize<'a>( - &'a self, - _req: proto_flow::materialize::Request, _logs_token: uuid::Uuid, - _task: ops::ShardRef, + task: ops::ShardRef, _data_plane: &'a tables::DataPlane, - ) -> anyhow::Result { - unimplemented!("mock connectors do not yet handle unary_materialize calls"); + ) -> anyhow::Result { + let Some(discover) = req.discover.take() else { + anyhow::bail!("unexpected capture request type: {req:?}") + }; + + let locked = self.mocks.lock().unwrap(); + let capture = models::Capture::new(&task.name); + let Some(mock) = locked.get(&capture) else { + anyhow::bail!("no mock for capture: {capture}"); + }; + + tracing::debug!(req = ?discover, resp = ?mock, "responding with mock discovered response"); + mock.clone() + .map_err(|err_str| anyhow::anyhow!("{err_str}")) + .map(|dr| capture::Response { + discovered: Some(dr), + ..Default::default() + }) } } diff --git a/crates/agent/src/integration_tests/user_discovers.rs b/crates/agent/src/integration_tests/user_discovers.rs index 5d33859304..57aa138b1f 100644 --- a/crates/agent/src/integration_tests/user_discovers.rs +++ b/crates/agent/src/integration_tests/user_discovers.rs @@ -53,7 +53,7 @@ async fn test_user_discovers() { draft_id, endpoint_config, false, - Box::new(Ok::(initial_resp)), + Ok(initial_resp), ) .await; assert!( @@ -217,7 +217,7 @@ async fn test_user_discovers() { draft_id, endpoint_config, true, - Box::new(Ok(next_discover.clone())), + Ok(next_discover.clone()), ) .await; assert!(result.job_status.is_success()); @@ -292,7 +292,7 @@ async fn test_user_discovers() { draft_id, endpoint_config, true, - Box::new(Ok(next_discover)), + Ok(next_discover), ) .await; assert!(result.job_status.is_success()); diff --git a/crates/agent/src/lib.rs b/crates/agent/src/lib.rs index 685af17ae0..bb74715d30 100644 --- a/crates/agent/src/lib.rs +++ b/crates/agent/src/lib.rs @@ -25,7 +25,7 @@ pub use discovers::DiscoverHandler; pub use evolution::EvolutionHandler; pub use handlers::{serve, HandleResult, Handler}; use lazy_static::lazy_static; -pub use proxy_connectors::{Connectors, DataPlaneConnectors, ProxyConnectors}; +pub use proxy_connectors::{DataPlaneConnectors, DiscoverConnectors, ProxyConnectors}; use regex::Regex; // Used during tests. diff --git a/crates/agent/src/proxy_connectors.rs b/crates/agent/src/proxy_connectors.rs index 1278a304e1..92c544aecd 100644 --- a/crates/agent/src/proxy_connectors.rs +++ b/crates/agent/src/proxy_connectors.rs @@ -5,34 +5,17 @@ use proto_flow::{capture, derive, flow::materialization_spec, materialize}; use std::future::Future; use uuid::Uuid; -/// Trait for performing unary connector RPCs from the control plane, which handles logging. -pub trait Connectors: Clone + Send + Sync + 'static { - fn unary_capture<'a>( +/// Trait for performing Discover operations from the control plane, which handles logging. +pub trait DiscoverConnectors: Clone + Send + Sync + 'static { + fn discover<'a>( &'a self, req: capture::Request, logs_token: Uuid, task: ops::ShardRef, data_plane: &'a tables::DataPlane, ) -> impl Future> + 'a + Send; - - fn unary_derive<'a>( - &'a self, - req: derive::Request, - logs_token: Uuid, - task: ops::ShardRef, - data_plane: &'a tables::DataPlane, - ) -> impl Future> + 'a + Send; - - fn unary_materialize<'a>( - &'a self, - req: materialize::Request, - logs_token: Uuid, - task: ops::ShardRef, - data_plane: &'a tables::DataPlane, - ) -> impl Future> + 'a + Send; } -// TODO: better name? #[derive(Debug, Clone)] pub struct DataPlaneConnectors { logs_tx: logs::Tx, @@ -43,14 +26,18 @@ impl DataPlaneConnectors { } } -impl Connectors for DataPlaneConnectors { - async fn unary_capture<'a>( +impl DiscoverConnectors for DataPlaneConnectors { + async fn discover<'a>( &'a self, req: capture::Request, logs_token: Uuid, task: ops::ShardRef, data_plane: &'a tables::DataPlane, ) -> anyhow::Result { + assert!( + req.discover.is_some(), + "expected a discover request, got: {req:?}" + ); let log_handler = logs::ops_handler( self.logs_tx.clone(), "unary_capture".to_string(), @@ -59,35 +46,6 @@ impl Connectors for DataPlaneConnectors { let proxy = ProxyConnectors::new(log_handler); proxy.unary_capture(data_plane, task, req).await } - - async fn unary_derive<'a>( - &'a self, - req: derive::Request, - logs_token: Uuid, - task: ops::ShardRef, - data_plane: &'a tables::DataPlane, - ) -> anyhow::Result { - let log_handler = - logs::ops_handler(self.logs_tx.clone(), "unary_derive".to_string(), logs_token); - let proxy = ProxyConnectors::new(log_handler); - proxy.unary_derive(data_plane, task, req).await - } - - async fn unary_materialize<'a>( - &'a self, - req: materialize::Request, - logs_token: Uuid, - task: ops::ShardRef, - data_plane: &'a tables::DataPlane, - ) -> anyhow::Result { - let log_handler = logs::ops_handler( - self.logs_tx.clone(), - "unary_materialize".to_string(), - logs_token, - ); - let proxy = ProxyConnectors::new(log_handler); - proxy.unary_materialize(data_plane, task, req).await - } } pub struct ProxyConnectors {