diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 0b69e01855f2..82900aac3644 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -71,18 +71,14 @@ pub struct SourceSender { impl Default for SourceSender { fn default() -> Self { - Self::new() - } -} - -impl SourceSender { - pub fn new() -> Self { Self { sender: broadcast::Sender::new(BROADCAST_CAP), send_buf: Default::default(), } } +} +impl SourceSender { pub fn get_receiver(&self) -> broadcast::Receiver { self.sender.subscribe() } diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 5d85664b1d00..e936a4303193 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -14,13 +14,16 @@ //! How to query table information from database +use common_error::ext::BoxedError; use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameManager}; use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; -use crate::adapter::error::{Error, TableNotFoundMetaSnafu, TableNotFoundSnafu}; +use crate::adapter::error::{ + Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, +}; use crate::adapter::TableName; use crate::repr::{self, ColumnType, RelationType}; @@ -52,8 +55,11 @@ impl TableSource { .await .with_context(|_| TableNotFoundMetaSnafu { msg: format!("Table name = {:?}, couldn't found table id", name), + })? + .with_context(|| UnexpectedSnafu { + reason: format!("Table name = {:?}, couldn't found table id", name), }) - .map(|id| id.unwrap().table_id()) + .map(|id| id.table_id()) } /// If the table havn't been created in database, the tableId returned would be null @@ -74,10 +80,12 @@ impl TableSource { self.table_info_manager .get(*table_id) .await - .with_context(|_| TableNotFoundMetaSnafu { - msg: format!("TableId = {:?}, couldn't found table name", table_id), + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .with_context(|| UnexpectedSnafu { + reason: format!("Table id = {:?}, couldn't found table name", table_id), }) - .map(|name| name.unwrap().table_name()) + .map(|name| name.table_name()) .map(|name| [name.catalog_name, name.schema_name, name.table_name]) } /// query metasrv about the table name and table id