Skip to content

Commit

Permalink
feat: remove new&unwrap
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed May 15, 2024
1 parent 97104b9 commit 8928b0b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
8 changes: 2 additions & 6 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,14 @@ pub struct SourceSender {

impl Default for SourceSender {
fn default() -> Self {
Self::new()
}
}

impl SourceSender {
pub fn new() -> Self {
Self {
sender: broadcast::Sender::new(BROADCAST_CAP),
send_buf: Default::default(),
}
}
}

impl SourceSender {
pub fn get_receiver(&self) -> broadcast::Receiver<DiffRow> {
self.sender.subscribe()
}
Expand Down
18 changes: 13 additions & 5 deletions src/flow/src/adapter/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

//! How to query table information from database

use common_error::ext::BoxedError;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;

use crate::adapter::error::{Error, TableNotFoundMetaSnafu, TableNotFoundSnafu};
use crate::adapter::error::{
Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
};
use crate::adapter::TableName;
use crate::repr::{self, ColumnType, RelationType};

Expand Down Expand Up @@ -52,8 +55,11 @@ impl TableSource {
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Table name = {:?}, couldn't found table id", name),
})?
.with_context(|| UnexpectedSnafu {
reason: format!("Table name = {:?}, couldn't found table id", name),
})
.map(|id| id.unwrap().table_id())
.map(|id| id.table_id())
}

/// If the table havn't been created in database, the tableId returned would be null
Expand All @@ -74,10 +80,12 @@ impl TableSource {
self.table_info_manager
.get(*table_id)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("TableId = {:?}, couldn't found table name", table_id),
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.with_context(|| UnexpectedSnafu {
reason: format!("Table id = {:?}, couldn't found table name", table_id),
})
.map(|name| name.unwrap().table_name())
.map(|name| name.table_name())
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
}
/// query metasrv about the table name and table id
Expand Down

0 comments on commit 8928b0b

Please sign in to comment.