Skip to content

Commit

Permalink
refactor: rename FlowTaskId to FlowId
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 29, 2024
1 parent 3b10465 commit cfdeb47
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 41 deletions.
4 changes: 2 additions & 2 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::ddl::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::Result;
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::FlowTaskId;
use crate::key::FlowId;
use crate::lock_key::{CatalogLock, FlowNameLock, SchemaLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateFlowTask;
Expand Down Expand Up @@ -195,7 +195,7 @@ pub struct CreateFlowTaskData {
pub(crate) cluster_id: ClusterId,
pub(crate) state: CreateFlowTaskState,
pub(crate) task: CreateFlowTask,
pub(crate) flow_id: Option<FlowTaskId>,
pub(crate) flow_id: Option<FlowId>,
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
}
Expand Down
8 changes: 4 additions & 4 deletions src/common/meta/src/ddl/flow_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use tonic::async_trait;

use crate::error::Result;
use crate::key::FlowTaskId;
use crate::key::FlowId;
use crate::peer::Peer;
use crate::sequence::SequenceRef;

Expand All @@ -43,13 +43,13 @@ impl FlowMetadataAllocator {
}

/// Allocates a the [FlowTaskId].
pub(crate) async fn allocate_flow_id(&self) -> Result<FlowTaskId> {
let flow_id = self.flow_id_sequence.next().await? as FlowTaskId;
pub(crate) async fn allocate_flow_id(&self) -> Result<FlowId> {
let flow_id = self.flow_id_sequence.next().await? as FlowId;
Ok(flow_id)
}

/// Allocates the [FlowTaskId] and [Peer]s.
pub async fn create(&self, partitions: usize) -> Result<(FlowTaskId, Vec<Peer>)> {
pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec<Peer>)> {
let flow_id = self.allocate_flow_id().await?;
let peers = self.partition_peer_allocator.alloc(partitions).await?;

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ pub const CACHE_KEY_PREFIXES: [&str; 4] = [
pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;

/// The id of flow.
pub type FlowTaskId = u32;
pub type FlowId = u32;
/// The partition of flow.
pub type FlowPartitionId = u32;

Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::key::flow::flow_name::FlowNameManager;
use crate::key::flow::flownode_flow::FlownodeFlowManager;
use crate::key::flow::table_flow::TableFlowManager;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{FlowTaskId, MetaKey};
use crate::key::{FlowId, MetaKey};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;

Expand Down Expand Up @@ -131,7 +131,7 @@ impl FlowMetadataManager {
/// Creates metadata for flow and returns an error if different metadata exists.
pub async fn create_flow_metadata(
&self,
flow_id: FlowTaskId,
flow_id: FlowId,
flow_value: FlowInfoValue,
) -> Result<()> {
let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self
Expand Down
16 changes: 8 additions & 8 deletions src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
txn_helper, DeserializedValueWithBytes, FlowPartitionId, FlowTaskId, MetaKey, TableMetaValue,
txn_helper, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetaKey, TableMetaValue,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
Expand Down Expand Up @@ -57,26 +57,26 @@ impl MetaKey<FlowInfoKey> for FlowInfoKey {

impl FlowInfoKey {
/// Returns the [FlowInfoKey].
pub fn new(flow_id: FlowTaskId) -> FlowInfoKey {
pub fn new(flow_id: FlowId) -> FlowInfoKey {
let inner = FlowInfoKeyInner::new(flow_id);
FlowInfoKey(FlowScoped::new(inner))
}

/// Returns the [FlowTaskId].
pub fn flow_id(&self) -> FlowTaskId {
pub fn flow_id(&self) -> FlowId {
self.0.flow_id
}
}

/// The key of flow metadata.
#[derive(Debug, Clone, Copy, PartialEq)]
struct FlowInfoKeyInner {
flow_id: FlowTaskId,
flow_id: FlowId,
}

impl FlowInfoKeyInner {
/// Returns a [FlowInfoKey] with the specified `flow_id`.
pub fn new(flow_id: FlowTaskId) -> FlowInfoKeyInner {
pub fn new(flow_id: FlowId) -> FlowInfoKeyInner {
FlowInfoKeyInner { flow_id }
}
}
Expand All @@ -103,7 +103,7 @@ impl MetaKey<FlowInfoKeyInner> for FlowInfoKeyInner {
err_msg: format!("Invalid FlowInfoKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let flow_id = captures[1].parse::<FlowTaskId>().unwrap();
let flow_id = captures[1].parse::<FlowId>().unwrap();
Ok(FlowInfoKeyInner { flow_id })
}
}
Expand Down Expand Up @@ -155,7 +155,7 @@ impl FlowInfoManager {
}

/// Returns the [FlowTaskValue] of specified `flow_id`.
pub async fn get(&self, flow_id: FlowTaskId) -> Result<Option<FlowInfoValue>> {
pub async fn get(&self, flow_id: FlowId) -> Result<Option<FlowInfoValue>> {
let key = FlowInfoKey::new(flow_id).to_bytes();
self.kv_backend
.get(&key)
Expand All @@ -169,7 +169,7 @@ impl FlowInfoManager {
/// Otherwise, the transaction will retrieve existing value.
pub(crate) fn build_create_txn(
&self,
flow_id: FlowTaskId,
flow_id: FlowId,
flow_value: &FlowInfoValue,
) -> Result<(
Txn,
Expand Down
10 changes: 5 additions & 5 deletions src/common/meta/src/key/flow/flow_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
txn_helper, DeserializedValueWithBytes, FlowTaskId, MetaKey, TableMetaValue, NAME_PATTERN,
txn_helper, DeserializedValueWithBytes, FlowId, MetaKey, TableMetaValue, NAME_PATTERN,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
Expand Down Expand Up @@ -125,17 +125,17 @@ impl FlowNameKeyInner {
/// The value of [FlowNameKey].
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub struct FlowNameValue {
flow_id: FlowTaskId,
flow_id: FlowId,
}

impl FlowNameValue {
/// Returns a [FlowNameValue] with specified [FlowTaskId].
pub fn new(flow_id: FlowTaskId) -> Self {
pub fn new(flow_id: FlowId) -> Self {
Self { flow_id }
}

/// Returns the [FlowTaskId]
pub fn flow_id(&self) -> FlowTaskId {
pub fn flow_id(&self) -> FlowId {
self.flow_id
}
}
Expand Down Expand Up @@ -176,7 +176,7 @@ impl FlowNameManager {
&self,
catalog_name: &str,
flow_name: &str,
flow_id: FlowTaskId,
flow_id: FlowId,
) -> Result<(
Txn,
impl FnOnce(
Expand Down
20 changes: 8 additions & 12 deletions src/common/meta/src/key/flow/flownode_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use snafu::OptionExt;

use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::{BytesAdapter, FlowPartitionId, FlowTaskId, MetaKey};
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
Expand Down Expand Up @@ -60,7 +60,7 @@ impl FlownodeFlowKey {
/// Returns a new [FlownodeFlowKey].
pub fn new(
flownode_id: FlownodeId,
flow_id: FlowTaskId,
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> FlownodeFlowKey {
let inner = FlownodeFlowKeyInner::new(flownode_id, flow_id, partition_id);
Expand All @@ -76,7 +76,7 @@ impl FlownodeFlowKey {
}

/// Returns the [FlowTaskId].
pub fn flow_id(&self) -> FlowTaskId {
pub fn flow_id(&self) -> FlowId {
self.0.flow_id
}

Expand All @@ -94,17 +94,13 @@ impl FlownodeFlowKey {
/// The key of mapping [FlownodeId] to [FlowTaskId].
pub struct FlownodeFlowKeyInner {
flownode_id: FlownodeId,
flow_id: FlowTaskId,
flow_id: FlowId,
partition_id: FlowPartitionId,
}

impl FlownodeFlowKeyInner {
/// Returns a [FlownodeFlowKey] with the specified `flownode_id`, `flow_id` and `partition_id`.
pub fn new(
flownode_id: FlownodeId,
flow_id: FlowTaskId,
partition_id: FlowPartitionId,
) -> Self {
pub fn new(flownode_id: FlownodeId, flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
Self {
flownode_id,
flow_id,
Expand Down Expand Up @@ -149,7 +145,7 @@ impl MetaKey<FlownodeFlowKeyInner> for FlownodeFlowKeyInner {
})?;
// Safety: pass the regex check above
let flownode_id = captures[1].parse::<FlownodeId>().unwrap();
let flow_id = captures[2].parse::<FlowTaskId>().unwrap();
let flow_id = captures[2].parse::<FlowId>().unwrap();
let partition_id = captures[3].parse::<FlowPartitionId>().unwrap();

Ok(FlownodeFlowKeyInner {
Expand Down Expand Up @@ -180,7 +176,7 @@ impl FlownodeFlowManager {
pub fn tasks(
&self,
flownode_id: FlownodeId,
) -> BoxStream<'static, Result<(FlowTaskId, FlowPartitionId)>> {
) -> BoxStream<'static, Result<(FlowId, FlowPartitionId)>> {
let start_key = FlownodeFlowKey::range_start_key(flownode_id);
let req = RangeRequest::new().with_prefix(start_key);

Expand All @@ -199,7 +195,7 @@ impl FlownodeFlowManager {
/// Puts `__flownode_task/{flownode_id}/{flow_id}/{partition_id}` keys.
pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
&self,
flow_id: FlowTaskId,
flow_id: FlowId,
flownode_ids: I,
) -> Txn {
let txns = flownode_ids
Expand Down
14 changes: 7 additions & 7 deletions src/common/meta/src/key/flow/table_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use table::metadata::TableId;

use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::{BytesAdapter, FlowPartitionId, FlowTaskId, MetaKey};
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
Expand All @@ -44,7 +44,7 @@ lazy_static! {
struct TableFlowKeyInner {
table_id: TableId,
flownode_id: FlownodeId,
flow_id: FlowTaskId,
flow_id: FlowId,
partition_id: FlowPartitionId,
}

Expand All @@ -71,7 +71,7 @@ impl TableFlowKey {
pub fn new(
table_id: TableId,
flownode_id: FlownodeId,
flow_id: FlowTaskId,
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> TableFlowKey {
let inner = TableFlowKeyInner::new(table_id, flownode_id, flow_id, partition_id);
Expand All @@ -91,7 +91,7 @@ impl TableFlowKey {
}

/// Returns the [FlowTaskId].
pub fn flow_id(&self) -> FlowTaskId {
pub fn flow_id(&self) -> FlowId {
self.0.flow_id
}

Expand All @@ -111,7 +111,7 @@ impl TableFlowKeyInner {
fn new(
table_id: TableId,
flownode_id: FlownodeId,
flow_id: FlowTaskId,
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> TableFlowKeyInner {
Self {
Expand Down Expand Up @@ -160,7 +160,7 @@ impl MetaKey<TableFlowKeyInner> for TableFlowKeyInner {
// Safety: pass the regex check above
let table_id = captures[1].parse::<TableId>().unwrap();
let flownode_id = captures[2].parse::<FlownodeId>().unwrap();
let flow_id = captures[3].parse::<FlowTaskId>().unwrap();
let flow_id = captures[3].parse::<FlowId>().unwrap();
let partition_id = captures[4].parse::<FlowPartitionId>().unwrap();
Ok(TableFlowKeyInner::new(
table_id,
Expand Down Expand Up @@ -206,7 +206,7 @@ impl TableFlowManager {
/// Puts `__table_task/{table_id}/{node_id}/{partition_id}` keys.
pub fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
&self,
flow_id: FlowTaskId,
flow_id: FlowId,
flownode_ids: I,
source_table_ids: &[TableId],
) -> Txn {
Expand Down

0 comments on commit cfdeb47

Please sign in to comment.