Skip to content

Commit

Permalink
Merge pull request #2 from kornia/rtspcam-stream
Browse files Browse the repository at this point in the history
Rtspcam stream and remote rerun logger
  • Loading branch information
edgarriba authored Jan 5, 2025
2 parents b6a33e0 + 2b664ce commit 0bc773d
Show file tree
Hide file tree
Showing 19 changed files with 710 additions and 326 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ kornia = { version = "0.1.8", features = ["gstreamer"] }
log = "0.4"
rayon = "1.10"
reqwest = { version = "0.12", features = ["json"] }
rerun = "0.21.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tempfile = "3.14"
tokio = { version = "1", features = ["full"] }
walkdir = "2.5"
whoami = "1.5"

cu29 = { git = "https://github.com/copper-project/copper-rs.git", branch = "master" }
cu29-helpers = { git = "https://github.com/copper-project/copper-rs.git", branch = "master" }
cu29 = { git = "https://github.com/copper-project/copper-rs.git", sha = "c6d2b1b5069057007e197faf858f932381ae260a" }
cu29-helpers = { git = "https://github.com/copper-project/copper-rs.git", sha = "c6d2b1b5069057007e197faf858f932381ae260a" }
5 changes: 5 additions & 0 deletions Cross.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[build]
default-target = "aarch64-unknown-linux-gnu"

[target.aarch64-unknown-linux-gnu]
dockerfile = "docker/aarch64.Dockerfile"
44 changes: 44 additions & 0 deletions bubbaloop.ron
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
(
tasks: [
(
id: "cam0",
type: "crate::cu29::tasks::V4L2Camera",
config: {
"camera_id": 0,
"rows": 480,
"cols": 640,
"fps": 30,
}
),
// NOTE: uncomment to use this camera
// (
// id: "cam0",
// type: "crate::cu29::tasks::RTSPCamera",
// config: {
// // URL of the RTSP camera
// // rtsp://<username>:<password>@<ip>:<port>/<stream>
// "url": "rtsp://tapo_entrance:[email protected]:554/stream2",
// }
// ),
(
id: "rerun",
type: "crate::cu29::tasks::RerunLogger",
config: {
// Path to the directory where the logs will be stored
"path": "/tmp/",
// IP address of the rerun server
"ip": "192.168.1.144",
// Port of the rerun server
"port": 9876,
}
)
],
cnx: [
(src: "cam0", dst: "rerun", msg: "crate::cu29::msgs::ImageRGBU8Msg"),
]
,
logging: (
slab_size_mib: 1024, // Preallocates 1GiB of memory map file at a time
section_size_mib: 100, // Preallocates 100MiB of memory map per section for the main logger.
),
)
24 changes: 24 additions & 0 deletions docker/aarch64.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#FROM ghcr.io/cross-rs/aarch64-unknown-linux-gnu:edge-centos
#FROM ghcr.io/cross-rs/aarch64-unknown-linux-gnu:0.2.5
#FROM ghcr.io/cross-rs/aarch64-unknown-linux-gnu@sha256:b4eff900bf2007cbcb54335a5826dedde6082f484bc8be7499d5ed071608ecf3
FROM nvcr.io/nvidia/l4t-base:r35.2.1

RUN apt-get update && apt-get install --assume-yes \
cmake \
curl \
pkg-config \
&& \
apt-get clean

RUN dpkg --add-architecture arm64

RUN apt-get update && apt-get install --assume-yes \
nasm \
libgstreamer1.0-dev:arm64 \
libgstreamer-plugins-base1.0-dev:arm64 \
libssl-dev:arm64 \
libglib2.0-dev:arm64 \
libudev-dev:arm64 \
&& \
apt-get clean

8 changes: 7 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
@_default:
just --list

build:
cargo build --release

Expand All @@ -19,7 +22,7 @@ install_deps:
help:
cargo run --release --bin bubbaloop -- --help

serve HOST PORT:
serve HOST="0.0.0.0" PORT="3000":
RUST_LOG=debug cargo run --release --bin serve -- -h {{HOST}} -p {{PORT}}

whoami HOST PORT:
Expand All @@ -36,3 +39,6 @@ pipeline-stop HOST PORT ID:

pipeline-list HOST PORT:
RUST_LOG=info cargo run --release --bin bubbaloop -- -h {{HOST}} -p {{PORT}} pipeline list

pipeline-config HOST PORT:
RUST_LOG=info cargo run --release --bin bubbaloop -- -h {{HOST}} -p {{PORT}} pipeline config
62 changes: 62 additions & 0 deletions scripts/cross_deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/bin/bash

# Stop the script if any command fails
set -e

# Parse command line arguments
while getopts "r:u:" opt; do
case $opt in
r) TARGET_IP="$OPTARG" # Target IP
;;
u) TARGET_USER="$OPTARG" # Target user
;;
esac
done

# Check if required arguments are provided
if [ -z "$TARGET_IP" ] || [ -z "$TARGET_USER" ]; then
echo "Usage: $0 -r <target-ip> -u <target-user>"
exit 1
fi

# Configuration
TARGET_PATH="/home/$TARGET_USER/deploy"
BINARY_NAME="serve"
LOCAL_FOLDER="/tmp/deploy_serve"
DEPLOY_ARCH="aarch64-unknown-linux-gnu"

# Colors for output
GREEN='\033[0;32m'
RED='\033[0;31m'
NC='\033[0m' # No Color

# Function to print status
print_status() {
echo -e "${GREEN}==> ${1}${NC}"
}

print_error() {
echo -e "${RED}==> ERROR: ${1}${NC}"
exit 1
}

# Check if cross is installed
if ! command -v cross &> /dev/null; then
print_error "cross is not installed. Install it with: cargo install cross"
fi

# Build the release binary
print_status "Building release binary for aarch64..."
cross build --target $DEPLOY_ARCH --release --bin $BINARY_NAME || print_error "Build failed"
rsync -a target/$DEPLOY_ARCH/release/$BINARY_NAME $LOCAL_FOLDER

# Check if binary exists
if [ ! -f "target/$DEPLOY_ARCH/release/$BINARY_NAME" ]; then
print_error "Binary not found after build"
fi

# Copy to remote machine
print_status "Copying to $TARGET_USER@$TARGET_IP:$TARGET_PATH..."
rsync -a $LOCAL_FOLDER $TARGET_USER@$TARGET_IP:$TARGET_PATH

print_status "Deploy completed successfully!"
1 change: 1 addition & 0 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl ApiServer {
.route("/api/v0/pipeline/start", post(pipeline::start_pipeline))
.route("/api/v0/pipeline/stop", post(pipeline::stop_pipeline))
.route("/api/v0/pipeline/list", get(pipeline::list_pipelines))
.route("/api/v0/pipeline/config", get(pipeline::get_config))
.with_state(store);

let listener = tokio::net::TcpListener::bind(addr).await?;
Expand Down
26 changes: 20 additions & 6 deletions src/bin/bubbaloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ enum PipelineMode {
Start(PipelineStartCommand),
Stop(PipelineStopCommand),
List(PipelineListCommand),
Config(PipelineConfigCommand),
}

#[derive(FromArgs)]
Expand All @@ -113,6 +114,11 @@ struct PipelineStopCommand {
/// List pipelines
struct PipelineListCommand {}

#[derive(FromArgs)]
#[argh(subcommand, name = "config")]
/// Get the pipeline config
struct PipelineConfigCommand {}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: CLIArgs = argh::from_env();
Expand Down Expand Up @@ -140,7 +146,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;

let result = response.json::<serde_json::Value>().await?;
println!("Result: {:?}", result);
println!("Result: {}", serde_json::to_string_pretty(&result)?);
}
},
Commands::Stats(stats_command) => match stats_command.mode {
Expand All @@ -151,12 +157,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;

let result = response.json::<serde_json::Value>().await?;
println!("Result: {:?}", result);
println!("Result: {}", serde_json::to_string_pretty(&result)?);
}
},
Commands::Pipeline(pipeline_command) => match pipeline_command.mode {
PipelineMode::Start(pipeline_start_command) => {
println!("Sending request to {} {} ", addr, pipeline_start_command.id);
let response = client
.post(format!("http://{}/api/v0/pipeline/start", addr))
.json(&bubbaloop::pipeline::PipelineStartRequest {
Expand All @@ -166,7 +171,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;

let result = response.json::<serde_json::Value>().await?;
println!("Result: {:?}", result);
println!("Result: {}", serde_json::to_string_pretty(&result)?);
}
PipelineMode::Stop(pipeline_stop_command) => {
let response = client
Expand All @@ -178,7 +183,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;

let result = response.json::<serde_json::Value>().await?;
println!("Result: {:?}", result);
println!("Result: {}", serde_json::to_string_pretty(&result)?);
}
PipelineMode::List(_pipeline_list_command) => {
let response = client
Expand All @@ -187,7 +192,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;

let result = response.json::<serde_json::Value>().await?;
println!("Result: {:?}", result);
println!("Result: {}", serde_json::to_string_pretty(&result)?);
}
PipelineMode::Config(_pipeline_config_command) => {
let response = client
.get(format!("http://{}/api/v0/pipeline/config", addr))
.send()
.await?;

let result = response.json::<serde_json::Value>().await?;
println!("Result: {}", serde_json::to_string_pretty(&result)?);
}
},
}
Expand Down
36 changes: 0 additions & 36 deletions src/cu29.rs

This file was deleted.

86 changes: 86 additions & 0 deletions src/cu29/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use cu29::prelude::*;
use cu29_helpers::basic_copper_setup;
use std::sync::{atomic::AtomicBool, Arc};
const SLAB_SIZE: Option<usize> = Some(150 * 1024 * 1024);

pub type PipelineResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;

// NOTE: this will use the default config file in the current directory during compilation
// however, it will be overridden by the ron config string when the pipeline is started
#[copper_runtime(config = "bubbaloop.ron")]
struct CopperApp {}

pub struct CopperPipeline(pub CopperApp);

impl CopperPipeline {
pub fn new() -> CuResult<Self> {
// NOTE: this is a temporary solution to store the logger in the user's home directory
let logger_dir = std::path::PathBuf::from(&format!("/home/{}", whoami::username()));
let logger_path = logger_dir.join("bubbaloop.copper");
debug!("Logger path: {}", path = &logger_path);

let copper_ctx = basic_copper_setup(&logger_path, SLAB_SIZE, true, None)?;
let application = CopperAppBuilder::new().with_context(&copper_ctx).build()?;

Ok(Self(application))
}
}

/// Spawns a new thread for the pipeline
///
/// This function is used to spawn a new thread for the pipeline
/// and to pass the stop signal to the pipeline
///
/// # Arguments
///
/// * `pipeline_id` - The id of the pipeline
/// * `stop_signal` - The stop signal to stop the pipeline
///
/// # Returns
///
/// A handle to the thread that runs the pipeline
pub fn spawn_cu29_thread(
pipeline_id: &str,
stop_signal: Arc<AtomicBool>,
) -> std::thread::JoinHandle<PipelineResult> {
let pipeline_id = pipeline_id.to_string();
std::thread::spawn({
let stop_signal = stop_signal.clone();
move || -> PipelineResult {
// parse the ron config string and create the pipeline
let mut app = CopperPipeline::new()?;

// create the pipeline and start the tasks
app.start_all_tasks()?;

while !stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
// we run the pipeline iteration step by step
app.run_one_iteration()?;

// NOTE: is this really needed?
std::thread::sleep(std::time::Duration::from_millis(30));
}

// stop the pipeline and wait for the tasks to finish
app.stop_all_tasks()?;

log::debug!("Pipeline {} stopped", pipeline_id);

Ok(())
}
})
}

impl std::ops::Deref for CopperPipeline {
type Target = CopperApp;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::ops::DerefMut for CopperPipeline {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
3 changes: 3 additions & 0 deletions src/cu29/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod app;
pub mod msgs;
pub mod tasks;
Loading

0 comments on commit 0bc773d

Please sign in to comment.