Skip to content
This repository has been archived by the owner on Jul 4, 2024. It is now read-only.

Commit

Permalink
Re-add download timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
bytedream committed Jan 14, 2024
1 parent f3faa5b commit 20f796f
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 63 deletions.
19 changes: 10 additions & 9 deletions crunchy-cli-core/src/archive/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,16 @@ impl Execute for Archive {

single_format_collection.full_visual_output();

let download_builder = DownloadBuilder::new(ctx.crunchy.client())
.default_subtitle(self.default_subtitle.clone())
.download_fonts(self.include_fonts)
.ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default())
.ffmpeg_threads(self.ffmpeg_threads)
.output_format(Some("matroska".to_string()))
.audio_sort(Some(self.audio.clone()))
.subtitle_sort(Some(self.subtitle.clone()))
.threads(self.threads);
let download_builder =
DownloadBuilder::new(ctx.client.clone(), ctx.rate_limiter.clone())
.default_subtitle(self.default_subtitle.clone())
.download_fonts(self.include_fonts)
.ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default())
.ffmpeg_threads(self.ffmpeg_threads)
.output_format(Some("matroska".to_string()))
.audio_sort(Some(self.audio.clone()))
.subtitle_sort(Some(self.subtitle.clone()))
.threads(self.threads);

for single_formats in single_format_collection.into_iter() {
let (download_formats, mut format) = get_format(&self, &single_formats).await?;
Expand Down
23 changes: 12 additions & 11 deletions crunchy-cli-core/src/download/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,18 @@ impl Execute for Download {

single_format_collection.full_visual_output();

let download_builder = DownloadBuilder::new(ctx.crunchy.client())
.default_subtitle(self.subtitle.clone())
.force_hardsub(self.force_hardsub)
.output_format(if is_special_file(&self.output) || self.output == "-" {
Some("mpegts".to_string())
} else {
None
})
.ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default())
.ffmpeg_threads(self.ffmpeg_threads)
.threads(self.threads);
let download_builder =
DownloadBuilder::new(ctx.client.clone(), ctx.rate_limiter.clone())
.default_subtitle(self.subtitle.clone())
.force_hardsub(self.force_hardsub)
.output_format(if is_special_file(&self.output) || self.output == "-" {
Some("mpegts".to_string())
} else {
None
})
.ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default())
.ffmpeg_threads(self.ffmpeg_threads)
.threads(self.threads);

for mut single_formats in single_format_collection.into_iter() {
// the vec contains always only one item
Expand Down
79 changes: 46 additions & 33 deletions crunchy-cli-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crunchyroll_rs::crunchyroll::CrunchyrollBuilder;
use crunchyroll_rs::error::Error;
use crunchyroll_rs::{Crunchyroll, Locale};
use log::{debug, error, warn, LevelFilter};
use reqwest::Proxy;
use reqwest::{Client, Proxy};
use std::{env, fs};

mod archive;
Expand Down Expand Up @@ -235,11 +235,51 @@ async fn execute_executor(executor: impl Execute, ctx: Context) {
}

async fn create_ctx(cli: &mut Cli) -> Result<Context> {
let crunchy = crunchyroll_session(cli).await?;
Ok(Context { crunchy })
let client = {
let mut builder = CrunchyrollBuilder::predefined_client_builder();
if let Some(p) = &cli.proxy {
builder = builder.proxy(p.clone())
}
if let Some(ua) = &cli.user_agent {
builder = builder.user_agent(ua)
}

#[cfg(any(feature = "openssl-tls", feature = "openssl-tls-static"))]
let client = {
let mut builder = builder.use_native_tls().tls_built_in_root_certs(false);

for certificate in rustls_native_certs::load_native_certs().unwrap() {
builder = builder.add_root_certificate(
reqwest::Certificate::from_der(certificate.0.as_slice()).unwrap(),
)
}

builder.build().unwrap()
};
#[cfg(not(any(feature = "openssl-tls", feature = "openssl-tls-static")))]
let client = builder.build().unwrap();

client
};

let rate_limiter = cli
.speed_limit
.map(|l| RateLimiterService::new(l, client.clone()));

let crunchy = crunchyroll_session(cli, client.clone(), rate_limiter.clone()).await?;

Ok(Context {
crunchy,
client,
rate_limiter,
})
}

async fn crunchyroll_session(cli: &mut Cli) -> Result<Crunchyroll> {
async fn crunchyroll_session(
cli: &mut Cli,
client: Client,
rate_limiter: Option<RateLimiterService>,
) -> Result<Crunchyroll> {
let supported_langs = vec![
Locale::ar_ME,
Locale::de_DE,
Expand Down Expand Up @@ -273,33 +313,6 @@ async fn crunchyroll_session(cli: &mut Cli) -> Result<Crunchyroll> {
lang
};

let client = {
let mut builder = CrunchyrollBuilder::predefined_client_builder();
if let Some(p) = &cli.proxy {
builder = builder.proxy(p.clone())
}
if let Some(ua) = &cli.user_agent {
builder = builder.user_agent(ua)
}

#[cfg(any(feature = "openssl-tls", feature = "openssl-tls-static"))]
let client = {
let mut builder = builder.use_native_tls().tls_built_in_root_certs(false);

for certificate in rustls_native_certs::load_native_certs().unwrap() {
builder = builder.add_root_certificate(
reqwest::Certificate::from_der(certificate.0.as_slice()).unwrap(),
)
}

builder.build().unwrap()
};
#[cfg(not(any(feature = "openssl-tls", feature = "openssl-tls-static")))]
let client = builder.build().unwrap();

client
};

let mut builder = Crunchyroll::builder()
.locale(locale)
.client(client.clone())
Expand All @@ -308,8 +321,8 @@ async fn crunchyroll_session(cli: &mut Cli) -> Result<Crunchyroll> {
if let Command::Download(download) = &cli.command {
builder = builder.preferred_audio_locale(download.audio.clone())
}
if let Some(speed_limit) = cli.speed_limit {
builder = builder.middleware(RateLimiterService::new(speed_limit, client));
if let Some(rate_limiter) = rate_limiter {
builder = builder.middleware(rate_limiter)
}

let root_login_methods_count = cli.login_method.credentials.is_some() as u8
Expand Down
4 changes: 4 additions & 0 deletions crunchy-cli-core/src/utils/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use crate::utils::rate_limit::RateLimiterService;
use crunchyroll_rs::Crunchyroll;
use reqwest::Client;

pub struct Context {
pub crunchy: Crunchyroll,
pub client: Client,
pub rate_limiter: Option<RateLimiterService>,
}
41 changes: 31 additions & 10 deletions crunchy-cli-core/src/utils/download.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::utils::ffmpeg::FFmpegPreset;
use crate::utils::filter::real_dedup_vec;
use crate::utils::os::{cache_dir, is_special_file, temp_directory, temp_named_pipe, tempfile};
use crate::utils::rate_limit::RateLimiterService;
use anyhow::{bail, Result};
use chrono::NaiveTime;
use crunchyroll_rs::media::{Subtitle, VariantData, VariantSegment};
Expand All @@ -26,6 +27,7 @@ use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tower_service::Service;

#[derive(Clone, Debug)]
pub enum MergeBehavior {
Expand All @@ -48,6 +50,7 @@ impl MergeBehavior {
#[derive(Clone, derive_setters::Setters)]
pub struct DownloadBuilder {
client: Client,
rate_limiter: Option<RateLimiterService>,
ffmpeg_preset: FFmpegPreset,
default_subtitle: Option<Locale>,
output_format: Option<String>,
Expand All @@ -60,9 +63,10 @@ pub struct DownloadBuilder {
}

impl DownloadBuilder {
pub fn new(client: Client) -> DownloadBuilder {
pub fn new(client: Client, rate_limiter: Option<RateLimiterService>) -> DownloadBuilder {
Self {
client,
rate_limiter,
ffmpeg_preset: FFmpegPreset::default(),
default_subtitle: None,
output_format: None,
Expand All @@ -78,6 +82,7 @@ impl DownloadBuilder {
pub fn build(self) -> Downloader {
Downloader {
client: self.client,
rate_limiter: self.rate_limiter,
ffmpeg_preset: self.ffmpeg_preset,
default_subtitle: self.default_subtitle,
output_format: self.output_format,
Expand Down Expand Up @@ -109,6 +114,7 @@ pub struct DownloadFormat {

pub struct Downloader {
client: Client,
rate_limiter: Option<RateLimiterService>,

ffmpeg_preset: FFmpegPreset,
default_subtitle: Option<Locale>,
Expand Down Expand Up @@ -768,6 +774,8 @@ impl Downloader {
for num in 0..cpus {
let thread_sender = sender.clone();
let thread_segments = segs.remove(0);
let thread_client = self.client.clone();
let mut thread_rate_limiter = self.rate_limiter.clone();
let thread_count = count.clone();
join_set.spawn(async move {
let after_download_sender = thread_sender.clone();
Expand All @@ -778,21 +786,34 @@ impl Downloader {
let download = || async move {
for (i, segment) in thread_segments.into_iter().enumerate() {
let mut retry_count = 0;
let buf = loop {
let mut buf = vec![];
match segment.write_to(&mut buf).await {
Ok(_) => break buf,
Err(e) => {
if retry_count == 5 {
bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), e)
}
debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count)
let mut buf = loop {
let request = thread_client
.get(&segment.url)
.timeout(Duration::from_secs(60));
let response = if let Some(rate_limiter) = &mut thread_rate_limiter {
rate_limiter.call(request.build()?).await.map_err(anyhow::Error::new)
} else {
request.send().await.map_err(anyhow::Error::new)
};

let err = match response {
Ok(r) => match r.bytes().await {
Ok(b) => break b.to_vec(),
Err(e) => anyhow::Error::new(e)
}
Err(e) => e,
};

if retry_count == 5 {
bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), err)
}
debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), err, 5 - retry_count);

retry_count += 1;
};

buf = VariantSegment::decrypt(&mut buf, segment.key)?.to_vec();

let mut c = thread_count.lock().await;
debug!(
"Downloaded and decrypted segment [{}/{} {:.2}%] {}",
Expand Down
1 change: 1 addition & 0 deletions crunchy-cli-core/src/utils/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use tower_service::Service;

#[derive(Clone)]
pub struct RateLimiterService {
client: Arc<Client>,
rate_limiter: Limiter,
Expand Down

0 comments on commit 20f796f

Please sign in to comment.