Skip to content

Commit

Permalink
No commit message
Browse files Browse the repository at this point in the history
  • Loading branch information
NikolaRHristov committed Sep 17, 2024
1 parent ecb71f2 commit 85692dd
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 28 deletions.
61 changes: 33 additions & 28 deletions Source/Fn/Binary/Command/Parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,47 +32,53 @@ pub mod Process;
pub async fn Fn(Option { Entry, Separator, Pattern, Command, .. }: Option) {
let (Approval, mut Receive) = tokio::sync::mpsc::unbounded_channel();

let Entry = tokio::task::spawn_blocking(move || {
Entry
.into_par_iter()
.filter_map(|Entry| {
Entry
.last()
.filter(|Last| *Last == &Pattern)
.map(|_| Entry[0..Entry.len() - 1].join(&Separator.to_string()))
})
.collect::<Vec<String>>()
})
.await
.expect("Cannot spawn_blocking.");
let Entry = Entry
.into_par_iter()
.filter_map(|Entry| {
Entry
.last()
.filter(|Last| *Last == &Pattern)
.map(|_| Entry[0..Entry.len() - 1].join(&Separator.to_string()))
})
.collect::<Vec<String>>();

let Pool = Arc::new(rayon::ThreadPoolBuilder::new().build().expect("Cannot build."));
let Semaphore = Arc::new(tokio::sync::Semaphore::new(num_cpus::get()));

futures::stream::iter(Entry.into_iter())
.map(|Entry| {
let Command = Command.clone();
let Approval = Approval.clone();
futures::future::join_all(Entry.into_iter().map(|Entry| {
let Command = Command.clone();
let Approval = Approval.clone();
let Semaphore = Arc::clone(&Semaphore);
let Pool = Arc::clone(&Pool);

async move {
tokio::spawn(async move {
let _Permit = Semaphore.acquire().await.expect("Cannot acquire.");

if let Err(_) = Approval.send(Pool.install(|| {
let mut Output = Vec::new();

for Command in &Command {
let Command: Vec<String> = Command.split(' ').map(String::from).collect();

if GPG::Fn(&Command) {
let Lock = GPG_MUTEX.lock().await;
let Lock = GPG_MUTEX.lock().expect("Cannot lock.");
drop(Lock);
}

Output.push(Process::Fn(&Command, &Entry).await);
Output.push(tokio::task::block_in_place(|| {
tokio::runtime::Runtime::new()
.expect("Cannot Runtime.")
.block_on(Process::Fn(&Command, &Entry))
}));
}

if let Err(_) = Approval.send(Output) {
eprintln!("Cannot send.");
}
Output
})) {
eprintln!("Cannot send.");
}
})
.buffer_unordered(num_cpus::get())
.collect::<Vec<()>>()
.await;
}))
.await;

drop(Approval);

Expand All @@ -84,9 +90,8 @@ pub async fn Fn(Option { Entry, Separator, Pattern, Command, .. }: Option) {
}

use crate::Struct::Binary::Command::Entry::Struct as Option;
use futures::StreamExt;
use once_cell::sync::Lazy;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use tokio::sync::Mutex;
use std::sync::{Arc, Mutex};

static GPG_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
Binary file modified Target/release/Inn.exe
Binary file not shown.
Binary file modified Target/release/InnKeeper.exe
Binary file not shown.
Binary file modified Target/release/PRun.exe
Binary file not shown.
Binary file modified Target/release/Run.exe
Binary file not shown.

0 comments on commit 85692dd

Please sign in to comment.