Skip to content

Commit

Permalink
chore: rename & update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 12, 2024
1 parent 8c38793 commit b77b739
Show file tree
Hide file tree
Showing 14 changed files with 68 additions and 202 deletions.
4 changes: 2 additions & 2 deletions crates/compute_unit_runner/src/data_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use jiaoziflow::{
JobDbRepo,
TrackerState,
},
network::datatransfer::MediaDataBatchResponse,
network::datatransfer::DataBatch,
utils::k8s_helper::get_machine_name,
};
use std::{
Expand Down Expand Up @@ -91,7 +91,7 @@ where
// channel for receive finish state from user container
pub(crate) ipc_process_finish_state_tx: Option<MessageSender<(), ()>>,

pub(crate) incoming_tx: Option<MessageSender<MediaDataBatchResponse, ()>>,
pub(crate) incoming_tx: Option<MessageSender<DataBatch, ()>>,
}
impl<R> MediaDataTracker<R>
where
Expand Down
8 changes: 4 additions & 4 deletions crates/compute_unit_runner/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use jiaoziflow::{
common::Empty,
datatransfer::{
data_stream_server::DataStream,
MediaDataBatchResponse,
DataBatch,
},
},
};
Expand Down Expand Up @@ -36,7 +36,7 @@ where
{
async fn transfer_media_data(
&self,
req: Request<MediaDataBatchResponse>,
req: Request<DataBatch>,
) -> Result<Response<Empty>, Status> {
let send_tx = {
let program = self.program.read().await;
Expand All @@ -47,7 +47,7 @@ where
};

let (tx, rx) = oneshot::channel::<Result<()>>();
let req: MediaDataBatchResponse = req.into_inner();
let req: DataBatch = req.into_inner();
if let Err(err) = send_tx.send((req, tx)).await {
return Err(Status::from_error(Box::new(err)));
}
Expand All @@ -62,7 +62,7 @@ where
async fn request_media_data(
&self,
_: Request<Empty>,
) -> Result<Response<MediaDataBatchResponse>, Status> {
) -> Result<Response<DataBatch>, Status> {
todo!();
}
}
18 changes: 9 additions & 9 deletions crates/nodes_sdk/src/fs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::{
use async_trait::async_trait;
use jiaoziflow::{
network::datatransfer::{
MediaDataBatchResponse,
DataBatch,
MediaDataCell,
},
utils::StdIntoAnyhowResult,
Expand All @@ -32,8 +32,8 @@ use walkdir::WalkDir;

#[async_trait]
pub trait FileCache: Send + Sync + 'static {
async fn write(&self, batch: MediaDataBatchResponse) -> Result<()>;
async fn read(&self, id: &str) -> Result<MediaDataBatchResponse>;
async fn write(&self, batch: DataBatch) -> Result<()>;
async fn read(&self, id: &str) -> Result<DataBatch>;
async fn remove(&self, id: &str) -> Result<()>;
async fn exit(&self, id: &str) -> Result<bool>;
}
Expand All @@ -53,7 +53,7 @@ impl FSCache {

#[async_trait]
impl FileCache for FSCache {
async fn write(&self, batch: MediaDataBatchResponse) -> Result<()> {
async fn write(&self, batch: DataBatch) -> Result<()> {
let now = Instant::now();
let tmp_in_path = self.path.join(&batch.id);
debug!(
Expand Down Expand Up @@ -88,10 +88,10 @@ impl FileCache for FSCache {
Ok(())
}

async fn read(&self, id: &str) -> Result<MediaDataBatchResponse> {
async fn read(&self, id: &str) -> Result<DataBatch> {
let now = Instant::now();
let tmp_out_path = self.path.join(id);
let mut new_batch = MediaDataBatchResponse::default();
let mut new_batch = DataBatch::default();
for entry in WalkDir::new(&tmp_out_path) {
match entry {
std::result::Result::Ok(entry) => {
Expand Down Expand Up @@ -134,7 +134,7 @@ impl FileCache for FSCache {
}

#[derive(Clone)]
pub struct MemCache(Arc<Mutex<HashMap<String, MediaDataBatchResponse>>>);
pub struct MemCache(Arc<Mutex<HashMap<String, DataBatch>>>);

impl Default for MemCache {
fn default() -> Self {
Expand All @@ -150,7 +150,7 @@ impl MemCache {

#[async_trait]
impl FileCache for MemCache {
async fn write(&self, batch: MediaDataBatchResponse) -> Result<()> {
async fn write(&self, batch: DataBatch) -> Result<()> {
let now = Instant::now();
let mut store = self.0.lock().await;
let id = batch.id.clone();
Expand All @@ -159,7 +159,7 @@ impl FileCache for MemCache {
Ok(())
}

async fn read(&self, id: &str) -> Result<MediaDataBatchResponse> {
async fn read(&self, id: &str) -> Result<DataBatch> {
let now = Instant::now();
let store = self.0.lock().await;
let result = match store.get(id) {
Expand Down
4 changes: 2 additions & 2 deletions crates/nodes_sdk/src/multi_sender.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use jiaoziflow::network::datatransfer::{
data_stream_client::DataStreamClient,
MediaDataBatchResponse,
DataBatch,
};
use tokio::time::Instant;
use tonic::transport::Channel;
Expand All @@ -25,7 +25,7 @@ impl MultiSender {
impl MultiSender {
pub async fn send(
&mut self,
val: MediaDataBatchResponse,
val: DataBatch,
sent_nodes: &[&str],
) -> Result<(), Vec<String>> {
let mut sent = vec![];
Expand Down
Binary file removed docs/images/compute_unit_pod.png
Binary file not shown.
Binary file removed docs/images/dag.png
Binary file not shown.
Binary file modified docs/images/job_detail.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
110 changes: 47 additions & 63 deletions docs/架构.md
Original file line number Diff line number Diff line change
@@ -1,93 +1,77 @@
# 架构设计
# 架构设计:jz-flow

jz-flow核心在于提供提供数据流功能,让数据能够沿着整个dag图谱进行流动。每当数据流过一个节点,节点根据自己的需要对数据流施加影响,改变数据的形态。每个节点可以读取流入本节点数据,处理数据,并生成新的数据输出到数据流当中
jz-flow 的核心在于提供高效的数据流功能,使数据能够在 DAG(有向无环图)中流动。每当数据流经一个节点时,该节点将对数据施加特定的影响,改变其形态。每个节点可以读取流入的数据,进行处理,并将处理后的数据输出到数据流中。借助 Kubernetes 强大的扩展和调度能力,本项目无需关注复杂的部署和调度问题,而是将重点放在图定义、数据协议设计以及节点实现上

得益于k8s强大的扩展调度能力,本项目不需要考虑复杂的部署,调度问题。而可以把重点放在图定义,节点间数据协议设计以及节点实现上。
1. dag图定义了用户镜像,命令,副本数,以及节点间前序后继的信息。
2. 数据协议保证了数据能够安全,无损的从上有节点传递到下游节点。
3. 节点实现定义了用户节点如何于核心数据流交互。即如何从数据流中获取数据,如何讲处理的结果输入到数据流当中。
## 核心组件

## 计算单元(ComputeUnit)
1. **DAG 图**
DAG 图定义了以下内容:
- 用户镜像
- 命令
- 副本数
- 节点间的前序和后续关系

执行节点用于对数据施加某种影响,实际使用中是在k8s当中启动一个或者多个节点,这些机器同时获取数据,处理数据。
计算单元是线上尽可能的支持自定义逻辑,用户可以根据需要需要操作表格,文件,视频,模型等。同时尽量提升节点的可复用性,希望能够尽可能的增强节点的复用性,避免重复工作
2. **数据协议**
数据协议保证数据能够安全、无损地从上游节点传递到下游节点,确保数据在传输过程中保持一致性

### 计算单元控制接口

1. status接口,用于或与计算单元的状态
1. start接口, 用于告知计算单元可以开始进行运算
2. pause接口, 用于告知计算单元停止计算
3. restart接口,用于告知处于暂停状态的计算单元恢复计算
4. stop接口, 用于告知计算单元当前任务已经结束

## 数据通道(DataChannel) (待定,可能会移除)

### 元数据库(MetaStore)

每个dag运行时都应当维护一个数据库,每个节点的每个批次数据都应当记录在数据库当中,数据批次经历Received, Processed, Clean的过程。在Received状态之前,前面的节点不能删除数据,在Processed之前,当前节点必须保存数据。确保数据不会因为机器物理原因丢失数据。

每次运行的数据记录在独立的库之中

1. global state 保存全局状态(grpah形态,graph 状态)
2. node state 记录每个节点的状态
3. **计算单元 (Compute Unit)**
计算单元用于对数据施加某种影响。在实际使用中,通过 Kubernetes 启动一个或多个计算节点,这些节点并行获取数据并处理数据。计算单元的设计重点是支持自定义逻辑,以适应不同的数据类型(如表格、文件、视频、模型等)。同时,为了增强复用性,计算单元应具备通用性,避免重复工作。

### 缓存数据(tempFileStore)

持久化的缓存方式: 防止极端情况下的系统崩溃,bug导致数据丢失。包括不限文件系统,对象存储,消息队列等。
内存缓冲: 速度快,抗风险能力差

## Scheduler

支持并行dag计算。每个计算单元和数据通道通过k8s部署,计算单元和数据通道之间通过网络通信进行沟通。

![dag](images/dag.png)
### 计算单元控制接口

scheduler根据graph从依赖图中自动插入数据通道,在从输入到输出依次部署事件单元和数据通道。准备完成后发起ready信号,整个运行图开始运转。
计算单元提供以下控制接口:
1. `status`: 获取计算单元的状态。
2. `start`: 告知计算单元开始计算。
3. `pause`: 暂停计算单元的运算。
4. `restart`: 恢复暂停的计算单元。
5. `stop`: 结束当前任务。

### 数据传输
4. **元数据库 (MetaStore)**
每个 DAG 运行时都维护一个元数据库,用于记录每个节点的每个数据批次状态。数据批次经历 `Received``Processed``Clean` 状态。为了保证数据的安全性:
-`Received` 状态之前,上游节点不能删除数据。
-`Processed` 状态之前,当前节点必须保留数据。

对于数据计算单元,数据使用广播到后续的所有数据通道,保证后续节点都会处理到该数据。
对于数据通道, 数据采用扇出的方式,保证数据不会被重复处理。
元数据库中的记录包括:
1. `global`: 保存全局状态(图形态、图状态)。
2. `node`: 记录每个节点的状态。
3. `data batch`: 记录每个数据批次的状态。

### 计算节点
5. **缓存数据 (TempFileStore)**
缓存数据用于暂时存储接收到的数据,有两种模式:
- **持久化缓存**: 防止系统崩溃或 bug 导致的数据丢失,适用于文件系统、对象存储、消息队列等。
- **内存缓存**: 提供快速数据访问,但抗风险能力较差。

计算节点运行在k8s的一个pod当中,一个pod包含两个container,分别是数据容器和用户容器。
## 计算单元运行模型

1. 数据容器用于控制数据从之前的数据通道获取数据,把数据写入到一个随机路径,然后通过ipc的方式发送信号告知用户容器有新的batch需要处理,用户容器处理完成后,回复完成信号。数据batch在进行删除缓存数据开始下一个batch
计算单元运行在 Kubernetes 的 Pod 中,每个 Pod 包含两个容器:数据容器和用户容器

2. 用户容器 允许用户自定义脚本。从特定位置读取数据,处理数据,并写入到指定的输出位置
1. **数据容器**: 控制数据流,从前序节点接收数据并记录保存,将用户容器生成的数据发送给后续节点。数据容器还负责管理数据状态的流转,并为用户容器提供接口,以便用户容器从数据流中提取或写入数据

![计算节点pod](./images/compute_unit_pod.png)
2. **用户容器**: 允许用户自定义镜像,从特定位置读取数据,处理数据,并将结果写入给定的输出位置。

### 数据通道节点
## 数据生命周期管理

数据通道节点采用单容器pod。订阅多个前序节点的数据输出。把数据合并成一个batch,并把数据通过扇出的方式传给后面的计算节点
通过数据标识控制数据的生命周期和可用性。将大数据集分为多个小批次(batch),并逐个发送以分片处理,便于通过扩展节点数量提升处理能力

### 节点一致性协调
### 标签机制

因为一个节点(包括计算节点和数据通道)需要支持水平扩充。因此需要维护这些pod数据的一致性。一致性的目标是:
1.可以自由伸缩,根据情况调整节点规模
2.保证输入的数据仅被处理一次,并且能成功传输给后续的节点。
- **Keep Data**: 数据传输后保存该数据。
- **Transparent Transfer**: 数据仅传输,不主动处理,但用户可以主动检索。

## 传输协议

传输协议采用grpc,原因在于

1. 长连接,容易实现订阅事件机制
2. protobuf协议紧凑,节省流量,效率高
传输协议采用 gRPC,因为 protobuf 协议紧凑且高效。

## 数据库

数据库采用postgres,效率良好,使用灵活。和jzfs保持一致。
元数据库采用mongo,大容量吞吐能力强,动态建表, 数据结构灵活。
数据库采用 MongoDB,具备大容量吞吐能力、动态建表、数据结构灵活的特点。

## 云原生
## 云原生架构

所有组件部署在k8s集群中,每个计算节点和数据通道都是一个独立的部署单元(Deployment),可以通过合适的编程扩充处理单元的数量
所有组件部署在 Kubernetes 集群中,每个计算节点是一个独立的部署单元。借助 Kubernetes 的 storageclass 机制,为 DAG 中的节点快速生成缓存存储

## 日志采集及运行状态监控

通过k8s系统采集所有节点日志,搜集k8s集群搜集节点状态信息(数据吞吐量,cpu, 内存,磁盘,网络)等,最后综合显示在ui上。

## 待确认(tbd)
通过 Kubernetes 系统采集所有节点日志,汇总节点状态信息(如数据吞吐量、CPU、内存、磁盘、网络等),并在 UI 上综合展示。

## 待确认 (TBD)
100 changes: 0 additions & 100 deletions docs/详细设计.md

This file was deleted.

Loading

0 comments on commit b77b739

Please sign in to comment.