Skip to content

Commit

Permalink
No commit message
Browse files Browse the repository at this point in the history
  • Loading branch information
NikolaRHristov committed Sep 17, 2024
1 parent c7dbfce commit e15797d
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 43 deletions.
128 changes: 85 additions & 43 deletions Source/Fn/Binary/Command/Parallel.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,57 @@
pub mod GPG;
pub mod Process;

/// Executes a command pipeline based on input entries and a pattern.
///
/// This function processes a set of string entries, filters them based on a pattern,
/// and then executes a command pipeline for each matching entry. The results of each
/// pipeline execution are then sent through a channel for further processing.
/// Executes a series of commands on a list of entries concurrently.
///
/// # Arguments
///
/// * `options`: An optional struct containing the following fields:
/// * `Entry`: A vector of string slices representing the input entries.
/// * `Separator`: A string slice used to join the filtered parts of an entry.
/// * `Pattern`: A string slice representing the pattern to filter entries.
/// * `Command`: A vector of string slices representing the command pipeline to execute.
/// * `Option` - An optional struct containing the following fields:
/// * `Entry`: A vector of strings representing the entries to process.
/// * `Separator`: A string used to join the entry parts after filtering.
/// * `Pattern`: A string used to filter the entries.
/// * `Command`: A vector of strings representing the commands to execute on each entry.
///
/// # Example
///
/// ```
/// ```rust
/// use your_crate::Fn;
///
/// let options = Some(YourOptionStruct {
/// Entry: vec!["entry1/part1", "entry1/part2", "entry2/part1"],
/// Separator: "/",
/// Pattern: "part2",
/// Command: vec!["echo hello", "grep world"],
/// let options = Some(Option {
/// Entry: vec!["entry1/part1".to_string(), "entry2/part1".to_string()],
/// Separator: "/".to_string(),
/// Pattern: "part1".to_string(),
/// Command: vec!["echo {}".to_string(), "ls -l {}".to_string()],
/// });
///
/// tokio::runtime::Runtime::new().unwrap().block_on(Fn(options));
/// tokio_test::block_on(Fn(options));
/// ```
///
/// This example defines a vector of entries, a separator, a pattern and a vector of commands.
/// The `Fn` function is then called with the options.
///
/// # Details
///
/// The function first filters the entries based on the provided pattern.
/// Then, it creates a queue of filtered entries and spawns multiple worker tasks.
/// Each worker task picks an entry from the queue and executes the provided commands on it.
/// The output of each command is collected and printed to the console.
///
/// The function utilizes parallel processing using `rayon` and asynchronous programming using `tokio` to improve performance.
///
/// The `GPG_MUTEX` is used to ensure that only one thread can access the GPG functions at a time.
///
/// # Note
///
/// The function assumes that the provided commands are valid shell commands.
///
/// The function also assumes that the `GPG::Fn` and `Process::Fn` functions are defined elsewhere and have the following signatures:
///
/// ```rust
/// fn GPG::Fn(command: &[String]) -> bool;
/// async fn Process::Fn(command: &[String], entry: &str) -> String;
/// ```
pub async fn Fn(Option { Entry, Separator, Pattern, Command, .. }: Option) {
let (Approval, mut Receive) = tokio::sync::mpsc::unbounded_channel();
let (Approval, mut Receive) = mpsc::unbounded_channel();
let Force = rayon::current_num_threads();

let Entry = Entry
Expand All @@ -49,52 +70,73 @@ pub async fn Fn(Option { Entry, Separator, Pattern, Command, .. }: Option) {
Queue.push(Entry).expect("Cannot push.");
}

(0..Force).into_par_iter().for_each(|_| {
let Runtime = tokio::runtime::Runtime::new().expect("Cannot Runtime.");
let (ApprovalWork, ReceiveWork) = mpsc::channel::<String>(32);
let ReceiveWork = Arc::new(Mutex::new(ReceiveWork));

let Output = tokio::spawn(async move {
while let Some(Output) = Receive.recv().await {
for Output in Output {
println!("{}", Output);
}
}
});

Runtime.block_on(async {
let Queue = Arc::clone(&Queue);
let Approval = Approval.clone();
let Command = Command.clone();
for _ in 0..Force {
let ReceiveWork = Arc::clone(&ReceiveWork);
let Approval = Approval.clone();
let Command = Command.clone();

tokio::spawn(async move {
loop {
if let Some(Entry) = Queue.pop() {
let mut Output = Vec::new();
let Entry = { ReceiveWork.lock().await.recv().await };

for Command in &Command {
let Command: Vec<String> = Command.split(' ').map(String::from).collect();
match Entry {
Some(Entry) => {
let mut Output = Vec::new();

if GPG::Fn(&Command) {
let _Lock = GPG_MUTEX.lock().await;
}
for Command in &Command {
let Command: Vec<String> =
Command.split(' ').map(String::from).collect();

Output.push(Process::Fn(&Command, &Entry).await);
}
if GPG::Fn(&Command) {
let _Lock = GPG_MUTEX.lock().await;
}

if let Err(_) = Approval.send(Output) {
eprintln!("Cannot send.");
Output.push(Process::Fn(&Command, &Entry).await);
}

if let Err(_) = Approval.send(Output) {
eprintln!("Cannot send.");
}
}
} else {
break;
None => break,
}
}
});
}

(0..Force).into_par_iter().for_each(|_| {
let ApprovalWork = ApprovalWork.clone();
let Queue = Arc::clone(&Queue);

tokio::runtime::Runtime::new().expect("Cannot Runtime.").block_on(async {
while let Some(Entry) = Queue.pop() {
ApprovalWork.send(Entry).await.expect("Cannot send.");
}
});
});

drop(Approval);
drop(ApprovalWork);

while let Some(Output) = Receive.recv().await {
for Output in Output {
println!("{}", Output);
}
}
Output.await.expect("Output task failed");
}

use crate::Struct::Binary::Command::Entry::Struct as Option;

use once_cell::sync::Lazy;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex};

static GPG_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
Binary file modified Target/release/Inn.exe
Binary file not shown.
Binary file modified Target/release/InnKeeper.exe
Binary file not shown.
Binary file modified Target/release/PRun.exe
Binary file not shown.
Binary file modified Target/release/Run.exe
Binary file not shown.

0 comments on commit e15797d

Please sign in to comment.