Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Onvif backchannel support #91

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/client/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
let session = retina::client::Session::describe(
opts.src.url.clone(),
retina::client::SessionOptions::default()
.back_channel(true)
.creds(creds)
.user_agent("Retina sdp example".to_owned()),
)
Expand Down
50 changes: 48 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use url::Url;

use crate::client::parse::SessionHeader;
use crate::codec::CodecItem;
use crate::rtp::SendingPacket;
use crate::{
Error, ErrorInt, RtspMessageContext, StreamContext, StreamContextInner, TcpStreamContext,
UdpStreamContext,
Expand Down Expand Up @@ -505,6 +506,7 @@ pub struct SessionOptions {
teardown: TeardownPolicy,
unassigned_channel_data: UnassignedChannelDataPolicy,
session_id: SessionIdPolicy,
back_channel: bool,
}

/// Policy for handling data received on unassigned RTSP interleaved channels.
Expand Down Expand Up @@ -709,6 +711,11 @@ impl SessionOptions {
self
}

pub fn back_channel(mut self, back_channel: bool) -> Self {
self.back_channel = back_channel;
self
}

pub fn session_id(mut self, policy: SessionIdPolicy) -> Self {
self.session_id = policy;
self
Expand Down Expand Up @@ -839,6 +846,13 @@ impl std::ops::Deref for Tool {
}
}

//enum sending or Receiving
#[derive(Copy, Clone, Debug)]
pub enum StreamDirection{
Receiving,
Sending,
}

/// Information about a stream offered within a presentation.
///
/// Currently if multiple formats are offered, this only describes the first.
Expand All @@ -854,6 +868,7 @@ pub struct Stream {
channels: Option<NonZeroU16>,
framerate: Option<f32>,
control: Option<Url>,
direction: StreamDirection
}

impl std::fmt::Debug for Stream {
Expand All @@ -866,6 +881,7 @@ impl std::fmt::Debug for Stream {
.field("clock_rate", &self.clock_rate_hz)
.field("channels", &self.channels)
.field("framerate", &self.framerate)
.field("direction", &self.direction)
.field("depacketizer", &self.depacketizer)
.field("state", &self.state)
.finish()
Expand Down Expand Up @@ -933,6 +949,10 @@ impl Stream {
pub fn control(&self) -> Option<&Url> {
self.control.as_ref()
}

pub fn direction(&self) -> StreamDirection {
self.direction
}
}

struct UdpSockets {
Expand Down Expand Up @@ -1498,8 +1518,12 @@ impl Session<Described> {
) -> Result<Self, Error> {
let mut req = rtsp_types::Request::builder(Method::Describe, rtsp_types::Version::V1_0)
.header(rtsp_types::headers::ACCEPT, "application/sdp")
.request_uri(url.clone())
.build(Bytes::new());
.request_uri(url.clone());
if options.back_channel{
req= req.header(rtsp_types::headers::REQUIRE, "www.onvif.org/ver20/backchannel");
}

let mut req = req.build(Bytes::new());
let mut requested_auth = None;
let (msg_ctx, cseq, response) = conn
.send(
Expand Down Expand Up @@ -2532,6 +2556,28 @@ impl PinnedDrop for SessionInner {
}
}

impl futures::Sink<SendingPacket> for Session<Playing>{
type Error = futures::io::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn start_send(self: Pin<&mut Self>, item: SendingPacket) -> Result<(), Self::Error> {
//self.0.conn.as_mut().unwrap().inner.send(item);
Ok(())
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}


impl futures::Stream for Session<Playing> {
type Item = Result<PacketItem, Error>;

Expand Down
7 changes: 6 additions & 1 deletion src/client/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use sdp_types::Media;
use std::{net::IpAddr, num::NonZeroU16};
use url::Url;

use super::{Presentation, Stream};
use super::{Presentation, Stream, StreamDirection};

/// A static payload type in the [RTP parameters
/// registry](https://www.iana.org/assignments/rtp-parameters/rtp-parameters.xhtml#rtp-parameters-1).
Expand Down Expand Up @@ -257,6 +257,7 @@ fn parse_media(base_url: &Url, media_description: &Media) -> Result<Stream, Stri
let mut fmtp = None;
let mut control = None;
let mut framerate = None;
let mut direction = StreamDirection::Receiving;
for a in &media_description.attributes {
match a.attribute.as_str() {
"rtpmap" => {
Expand Down Expand Up @@ -313,6 +314,9 @@ fn parse_media(base_url: &Url, media_description: &Media) -> Result<Stream, Stri
}
}
}
"sendonly" => {
direction=StreamDirection::Sending;
}
_ => (),
}
}
Expand Down Expand Up @@ -376,6 +380,7 @@ fn parse_media(base_url: &Url, media_description: &Media) -> Result<Stream, Stri
channels,
framerate,
state: super::StreamState::Uninit,
direction,
})
}

Expand Down
15 changes: 15 additions & 0 deletions src/rtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,21 @@ impl RawPacketBuilder {
}
}

pub struct SendingPacket{
stream_id: usize,
raw: RawPacket,
payload_range: Range<u16>,
}

impl std::fmt::Debug for SendingPacket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SendingPacket")
.field("streamId", &self.stream_id)
.field("payload_type", &self.raw.payload_type())
.finish()
}
}

/// A received RTP packet.
///
/// This holds more information than the packet itself: also a
Expand Down