Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deserialization of webrtc stats #553

Merged
merged 1 commit into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ice/src/candidate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::time::SystemTime;

use async_trait::async_trait;
use candidate_base::*;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, Mutex};

use crate::error::Result;
Expand Down Expand Up @@ -88,7 +88,7 @@ pub trait Candidate: fmt::Display {
}

/// Represents the type of candidate `CandidateType` enum.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CandidateType {
#[serde(rename = "unspecified")]
Unspecified,
Expand Down Expand Up @@ -170,7 +170,7 @@ impl fmt::Display for CandidateRelatedAddress {
}

/// Represent the ICE candidate pair state.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CandidatePairState {
#[serde(rename = "unspecified")]
Unspecified = 0,
Expand Down
4 changes: 2 additions & 2 deletions webrtc/src/data_channel/data_channel_state.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::fmt;

use serde::Serialize;
use serde::{Deserialize, Serialize};

/// DataChannelState indicates the state of a data channel.
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, Serialize)]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RTCDataChannelState {
#[serde(rename = "unspecified")]
#[default]
Expand Down
8 changes: 4 additions & 4 deletions webrtc/src/peer_connection/peer_connection_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ impl PeerConnectionInternal {
stats_type: RTCStatsType::InboundRTP,
id: id.clone(),
ssrc,
kind,
kind: kind.to_owned(),
packets_received,
track_identifier: info.track_id,
mid: info.mid,
Expand Down Expand Up @@ -1307,7 +1307,7 @@ impl PeerConnectionInternal {
id,

ssrc,
kind,
kind: kind.to_owned(),

packets_sent: remote_packets_sent as u64,
bytes_sent: remote_bytes_sent as u64,
Expand Down Expand Up @@ -1420,7 +1420,7 @@ impl PeerConnectionInternal {
track_identifier,
id: id.clone(),
ssrc,
kind,
kind: kind.to_owned(),
packets_sent,
mid,
rid,
Expand All @@ -1447,7 +1447,7 @@ impl PeerConnectionInternal {
stats_type: RTCStatsType::RemoteInboundRTP,
id,
ssrc,
kind,
kind: kind.to_owned(),

packets_received: remote_inbound_packets_received,
packets_lost: remote_inbound_packets_lost as i64,
Expand Down
136 changes: 119 additions & 17 deletions webrtc/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ice::agent::agent_stats::{CandidatePairStats, CandidateStats};
use ice::agent::Agent;
use ice::candidate::{CandidatePairState, CandidateType};
use ice::network_type::NetworkType;
use serde::{Serialize, Serializer};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use smol_str::SmolStr;
use stats_collector::StatsCollector;
use tokio::time::Instant;
Expand All @@ -22,7 +22,7 @@ use crate::sctp_transport::RTCSctpTransport;
mod serialize;
pub mod stats_collector;

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
pub enum RTCStatsType {
#[serde(rename = "candidate-pair")]
CandidatePair,
Expand Down Expand Up @@ -114,6 +114,84 @@ impl Serialize for StatsReportType {
}
}

impl<'de> Deserialize<'de> for StatsReportType {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let value = serde_json::Value::deserialize(deserializer)?;
let type_field = value
.get("type")
.ok_or_else(|| serde::de::Error::missing_field("type"))?;
let rtc_type: RTCStatsType = serde_json::from_value(type_field.clone()).map_err(|e| {
serde::de::Error::custom(format!(
"failed to deserialize RTCStatsType from the `type` field ({}): {}",
type_field, e
))
})?;

match rtc_type {
RTCStatsType::CandidatePair => {
let stats = serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(StatsReportType::CandidatePair(stats))
}
RTCStatsType::Certificate => {
let stats = serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(StatsReportType::CertificateStats(stats))
}
RTCStatsType::Codec => {
let stats = serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(StatsReportType::Codec(stats))
}
RTCStatsType::CSRC => {
todo!()
}
RTCStatsType::DataChannel => {
let stats = serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(StatsReportType::DataChannel(stats))
}
RTCStatsType::InboundRTP => {
let stats = serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(StatsReportType::InboundRTP(stats))
}
RTCStatsType::LocalCandidate => {
let stats = serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(StatsReportType::LocalCandidate(stats))
}
RTCStatsType::OutboundRTP => {
let stats = serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(StatsReportType::OutboundRTP(stats))
}
RTCStatsType::PeerConnection => {
let stats = serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(StatsReportType::PeerConnection(stats))
}
RTCStatsType::Receiver => {
todo!()
}
RTCStatsType::RemoteCandidate => {
let stats = serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(StatsReportType::RemoteCandidate(stats))
}
RTCStatsType::RemoteInboundRTP => {
let stats = serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(StatsReportType::RemoteInboundRTP(stats))
}
RTCStatsType::RemoteOutboundRTP => {
let stats = serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(StatsReportType::RemoteOutboundRTP(stats))
}
RTCStatsType::Sender => {
todo!()
}
RTCStatsType::Transport => {
let stats = serde_json::from_value(value).map_err(serde::de::Error::custom)?;
Ok(StatsReportType::Transport(stats))
}
}
}
}

#[derive(Debug)]
pub struct StatsReport {
pub reports: HashMap<String, StatsReportType>,
Expand All @@ -136,7 +214,31 @@ impl Serialize for StatsReport {
}
}

#[derive(Debug, Serialize)]
impl<'de> Deserialize<'de> for StatsReport {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let value = serde_json::Value::deserialize(deserializer)?;
let root = value
.as_object()
.ok_or(serde::de::Error::custom("root object missing"))?;

let mut reports = HashMap::new();
for (key, value) in root {
let report = serde_json::from_value(value.clone()).map_err(|e| {
serde::de::Error::custom(format!(
"failed to deserialize `StatsReportType` from key={}, value={}: {}",
key, value, e
))
})?;
reports.insert(key.clone(), report);
}
Ok(Self { reports })
}
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ICECandidatePairStats {
// RTCStats
Expand Down Expand Up @@ -217,7 +319,7 @@ impl From<CandidatePairStats> for ICECandidatePairStats {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ICECandidateStats {
// RTCStats
Expand Down Expand Up @@ -256,7 +358,7 @@ impl ICECandidateStats {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ICETransportStats {
// RTCStats
Expand All @@ -283,7 +385,7 @@ impl ICETransportStats {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CertificateStats {
// RTCStats
Expand Down Expand Up @@ -313,7 +415,7 @@ impl CertificateStats {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CodecStats {
// RTCStats
Expand Down Expand Up @@ -347,7 +449,7 @@ impl From<&RTCRtpCodecParameters> for CodecStats {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DataChannelStats {
// RTCStats
Expand Down Expand Up @@ -402,7 +504,7 @@ impl DataChannelStats {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PeerConnectionStats {
// RTCStats
Expand Down Expand Up @@ -435,7 +537,7 @@ impl PeerConnectionStats {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InboundRTPStats {
// RTCStats
Expand All @@ -447,7 +549,7 @@ pub struct InboundRTPStats {

// RTCRtpStreamStats
pub ssrc: SSRC,
pub kind: &'static str, // Either "video" or "audio"
pub kind: String, // Either "video" or "audio"
// TODO: Add transportId
// TODO: Add codecId

Expand Down Expand Up @@ -482,7 +584,7 @@ pub struct InboundRTPStats {
// all decoder specific and can't be produced since we aren't decoding.
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OutboundRTPStats {
// RTCStats
Expand All @@ -494,7 +596,7 @@ pub struct OutboundRTPStats {

// RTCRtpStreamStats
pub ssrc: SSRC,
pub kind: &'static str, // Either "video" or "audio"
pub kind: String, // Either "video" or "audio"
// TODO: Add transportId
// TODO: Add codecId

Expand Down Expand Up @@ -525,7 +627,7 @@ pub struct OutboundRTPStats {
// encoding.
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RemoteInboundRTPStats {
// RTCStats
Expand All @@ -537,7 +639,7 @@ pub struct RemoteInboundRTPStats {

// RTCRtpStreamStats
pub ssrc: SSRC,
pub kind: &'static str, // Either "video" or "audio"
pub kind: String, // Either "video" or "audio"
// TODO: Add transportId
// TODO: Add codecId

Expand All @@ -556,7 +658,7 @@ pub struct RemoteInboundRTPStats {
pub round_trip_time_measurements: u64,
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RemoteOutboundRTPStats {
// RTCStats
Expand All @@ -568,7 +670,7 @@ pub struct RemoteOutboundRTPStats {

// RTCRtpStreamStats
pub ssrc: SSRC,
pub kind: &'static str, // Either "video" or "audio"
pub kind: String, // Either "video" or "audio"
// TODO: Add transportId
// TODO: Add codecId

Expand Down
28 changes: 25 additions & 3 deletions webrtc/src/stats/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
/// Note that an `Instant` is not connected to real world time, so this conversion is
/// approximate.
pub mod instant_to_epoch_seconds {
use std::time::{SystemTime, UNIX_EPOCH};

use serde::{Serialize, Serializer};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::time::Instant;

pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
Expand All @@ -25,4 +24,27 @@ pub mod instant_to_epoch_seconds {

epoch_ms.serialize(serializer)
}

pub fn deserialize<'de, D>(deserializer: D) -> Result<Instant, D::Error>
where
D: Deserializer<'de>,
{
let epoch_seconds: f64 = Deserialize::deserialize(deserializer)?;

let since_epoch = Duration::from_secs_f64(epoch_seconds);

let system_now = SystemTime::now();
let instant_now = Instant::now();

let deserialized_system_time = UNIX_EPOCH + since_epoch;

let adjustment = match deserialized_system_time.duration_since(system_now) {
Ok(duration) => -duration.as_secs_f64(),
Err(e) => e.duration().as_secs_f64(),
};

let adjusted_instant = instant_now + Duration::from_secs_f64(adjustment);

Ok(adjusted_instant)
}
}
Loading