From a22a5113540949b9eb828b132251e00cd72b977f Mon Sep 17 00:00:00 2001 From: Araray Velho Date: Fri, 6 Dec 2024 21:27:19 -0300 Subject: [PATCH] Black run --- plugins/custom_filter.py | 1 - setup.py | 34 +++---- sygnals/cli.py | 149 ++++++++++++++++++++++++------ sygnals/core/__init__.py | 2 +- sygnals/core/audio_handler.py | 10 +- sygnals/core/custom_exec.py | 3 +- sygnals/core/data_handler.py | 17 ++-- sygnals/core/dsp.py | 21 +++-- sygnals/core/filters.py | 12 ++- sygnals/core/plugin_manager.py | 10 +- sygnals/core/storage.py | 3 +- sygnals/core/transforms.py | 13 ++- sygnals/plugins/custom_filter.py | 1 - sygnals/plugins/example_plugin.py | 3 +- sygnals/utils/helpers.py | 2 + sygnals/utils/visualizations.py | 55 ++++++----- tests/generate_test_wav_file.py | 1 - tests/test_cli.py | 11 ++- tests/test_dsp.py | 22 ++++- tools/frequencies.py | 11 ++- 20 files changed, 269 insertions(+), 112 deletions(-) diff --git a/plugins/custom_filter.py b/plugins/custom_filter.py index ba7cabc..0b7f8b2 100644 --- a/plugins/custom_filter.py +++ b/plugins/custom_filter.py @@ -5,4 +5,3 @@ def amplify(data, factor=2): """Amplifies the signal.""" return data * factor - diff --git a/setup.py b/setup.py index 383a3cc..cbc29ca 100644 --- a/setup.py +++ b/setup.py @@ -1,27 +1,27 @@ from setuptools import find_packages, setup setup( - name='sygnals', - version='0.1.0', - description='A versatile CLI for signal and audio processing.', - author='Araray Velho', - author_email='araray@gmail.com', + name="sygnals", + version="0.1.0", + description="A versatile CLI for signal and audio processing.", + author="Araray Velho", + author_email="araray@gmail.com", packages=find_packages(), install_requires=[ - 'numpy', - 'scipy', - 'pandas', - 'pandasql', - 'librosa', - 'soundfile', - 'matplotlib', - 'click', - 'tabulate', - 'pywavelets' + "numpy", + "scipy", + "pandas", + "pandasql", + "librosa", + "soundfile", + "matplotlib", + "click", + "tabulate", + "pywavelets", ], entry_points={ - 'console_scripts': [ - 'sygnals=sygnals.cli:cli', + "console_scripts": [ + "sygnals=sygnals.cli:cli", ], }, ) diff --git a/sygnals/cli.py b/sygnals/cli.py index e95ac7c..128a50c 100644 --- a/sygnals/cli.py +++ b/sygnals/cli.py @@ -9,25 +9,37 @@ from rich.table import Table from tabulate import tabulate -from sygnals.core import (audio_handler, batch_processor, custom_exec, - data_handler, dsp, filters, plugin_manager, storage, - transforms) +from sygnals.core import ( + audio_handler, + batch_processor, + custom_exec, + data_handler, + dsp, + filters, + plugin_manager, + storage, + transforms, +) from sygnals.utils import visualizations # Global Console Object console = Console() + @click.group() def cli(): """Sygnals: A versatile CLI for signal and audio processing.""" pass + @cli.command() @click.argument("file", type=click.Path(exists=True)) -@click.option("--output", type=click.Choice(["json", "csv", "tabulated"]), default="json") +@click.option( + "--output", type=click.Choice(["json", "csv", "tabulated"]), default="json" +) def analyze(file, output): """Analyze a data or audio file.""" - if file.endswith(('.wav', '.mp3')): + if file.endswith((".wav", ".mp3")): data, sr = audio_handler.load_audio(file) # Load both data and sample rate metrics = audio_handler.get_audio_metrics(data, sr) else: @@ -36,18 +48,25 @@ def analyze(file, output): "rows": len(data), "mean": data.mean().to_dict(), "max": data.max().to_dict(), - "min": data.min().to_dict() + "min": data.min().to_dict(), } if output == "json": # Ensure compatibility with JSON serialization - click.echo(json.dumps(metrics, indent=2, default=lambda x: float(x) if isinstance(x, (int, float)) else x)) + click.echo( + json.dumps( + metrics, + indent=2, + default=lambda x: float(x) if isinstance(x, (int, float)) else x, + ) + ) elif output == "csv": pd.DataFrame([metrics]).to_csv("analysis.csv", index=False) click.echo("Analysis saved to analysis.csv") else: click.echo(tabulate(metrics.items(), headers=["Metric", "Value"])) + @cli.command() @click.argument("file", type=click.Path(exists=True)) @click.option("--fft", is_flag=True, help="Apply FFT.") @@ -55,13 +74,19 @@ def analyze(file, output): @click.option("--output", type=click.Path(), required=True) def transform(file, fft, wavelet, output): """Apply transforms (FFT, Wavelet) to data or audio.""" - data = data_handler.read_data(file) if not file.endswith(('.wav', '.mp3')) else audio_handler.load_audio(file) + data = ( + data_handler.read_data(file) + if not file.endswith((".wav", ".mp3")) + else audio_handler.load_audio(file) + ) if fft: - freqs, magnitudes = dsp.compute_fft(data['value'], fs=1) # fs=1 for time-series data + freqs, magnitudes = dsp.compute_fft( + data["value"], fs=1 + ) # fs=1 for time-series data result = pd.DataFrame({"Frequency (Hz)": freqs, "Magnitude": magnitudes}) elif wavelet: - coeffs = transforms.wavelet_transform(data['value'], wavelet) + coeffs = transforms.wavelet_transform(data["value"], wavelet) result = pd.DataFrame({f"Level {i+1}": coeff for i, coeff in enumerate(coeffs)}) else: raise click.UsageError("Specify a transform (FFT or Wavelet).") @@ -69,11 +94,17 @@ def transform(file, fft, wavelet, output): data_handler.save_data(result, output) click.echo(f"Transform saved to {output}") + @cli.command() @click.argument("file", type=click.Path(exists=True)) @click.option("--low-pass", type=float, help="Low-pass filter cutoff frequency (Hz).") @click.option("--high-pass", type=float, help="High-pass filter cutoff frequency (Hz).") -@click.option("--band-pass", nargs=2, type=float, help="Band-pass filter cutoff frequencies (low, high).") +@click.option( + "--band-pass", + nargs=2, + type=float, + help="Band-pass filter cutoff frequencies (low, high).", +) @click.option("--output", type=click.Path(), required=True) def filter(file, low_pass, high_pass, band_pass, output): """Apply filters to a signal or audio.""" @@ -87,17 +118,22 @@ def filter(file, low_pass, high_pass, band_pass, output): elif band_pass: result = filters.band_pass_filter(data["value"], band_pass[0], band_pass[1], fs) else: - raise click.UsageError("Specify a filter type (low-pass, high-pass, or band-pass).") + raise click.UsageError( + "Specify a filter type (low-pass, high-pass, or band-pass)." + ) # Save filtered data filtered_df = pd.DataFrame({"time": data["time"], "value": result}) data_handler.save_data(filtered_df, output) click.echo(f"Filtered signal saved to {output}") + @cli.command() @click.argument("file", type=click.Path(exists=True)) @click.option("--query", type=str, help="SQL query to run on the data.") -@click.option("--filter", type=str, help="Pandas filter expression (e.g., 'value > 10').") +@click.option( + "--filter", type=str, help="Pandas filter expression (e.g., 'value > 10')." +) @click.option("--output", type=click.Path(), required=True) def manipulate(file, query, filter, output): """Manipulate data using SQL or Pandas expressions.""" @@ -119,11 +155,21 @@ def audio(): """Audio processing commands.""" pass + @audio.command() @click.argument("file", type=click.Path(exists=True)) -@click.option("--output", type=click.Path(), help="Export audio data to a file (csv, json, wav).") -@click.option("--format", type=click.Choice(["raw", "csv", "json", "tabulate", "wav"]), default="tabulate", help="Format for displaying or exporting audio data.") -@click.option("--full-output", is_flag=True, help="Force full output instead of truncating.") +@click.option( + "--output", type=click.Path(), help="Export audio data to a file (csv, json, wav)." +) +@click.option( + "--format", + type=click.Choice(["raw", "csv", "json", "tabulate", "wav"]), + default="tabulate", + help="Format for displaying or exporting audio data.", +) +@click.option( + "--full-output", is_flag=True, help="Force full output instead of truncating." +) def show(file, output, format, full_output): """Show audio data and optionally export it.""" data, sr = audio_handler.load_audio(file) @@ -134,7 +180,9 @@ def show(file, output, format, full_output): if format == "raw": # Raw format: space-separated values for piping - raw_output = "\n".join(f"{row.time} {row.amplitude}" for _, row in audio_df.iterrows()) + raw_output = "\n".join( + f"{row.time} {row.amplitude}" for _, row in audio_df.iterrows() + ) if output: with open(output, "w") as f: f.write(raw_output) @@ -166,6 +214,7 @@ def show(file, output, format, full_output): audio_handler.save_audio(data, sr, output) click.echo(f"Audio data exported to {output}") + @audio.command() @click.argument("file", type=click.Path(exists=True)) def info(file): @@ -181,9 +230,12 @@ def info(file): } click.echo(json.dumps(info, indent=2)) + @audio.command() @click.argument("file", type=click.Path(exists=True)) -@click.option("--new-sr", type=int, required=True, help="New sampling rate for the audio file.") +@click.option( + "--new-sr", type=int, required=True, help="New sampling rate for the audio file." +) @click.option("--output", type=click.Path(), required=True) def resample(file, new_sr, output): """Resample audio to a new sampling rate.""" @@ -192,9 +244,15 @@ def resample(file, new_sr, output): audio_handler.save_audio(resampled_data, new_sr, output) click.echo(f"Resampled audio saved to {output} with sampling rate {new_sr} Hz") + @audio.command() @click.argument("file", type=click.Path(exists=True)) -@click.option("--target-amplitude", type=float, required=True, help="Target peak amplitude for normalization.") +@click.option( + "--target-amplitude", + type=float, + required=True, + help="Target peak amplitude for normalization.", +) @click.option("--output", type=click.Path(), required=True) def normalize(file, target_amplitude, output): """Normalize audio to a target amplitude.""" @@ -204,10 +262,19 @@ def normalize(file, target_amplitude, output): audio_handler.save_audio(normalized_data, sr, output) click.echo(f"Normalized audio saved to {output}") + @audio.command() @click.argument("file", type=click.Path(exists=True)) -@click.option("--effect", type=click.Choice(["stretch", "pitch-shift", "compression"]), required=True) -@click.option("--factor", type=float, help="Stretch factor or semitone shift (e.g., 1.5 for stretch, 2 for pitch shift).") +@click.option( + "--effect", + type=click.Choice(["stretch", "pitch-shift", "compression"]), + required=True, +) +@click.option( + "--factor", + type=float, + help="Stretch factor or semitone shift (e.g., 1.5 for stretch, 2 for pitch shift).", +) @click.option("--output", type=click.Path(), required=True) def effect(file, effect, factor, output): """Apply audio effects like time-stretching or pitch-shifting.""" @@ -229,6 +296,7 @@ def effect(file, effect, factor, output): audio_handler.save_audio(result, sr, output) click.echo(f"Effect applied and saved to {output}") + @audio.command() @click.argument("file", type=click.Path(exists=True)) @click.option("--start", type=float, required=True, help="Start time in seconds.") @@ -241,25 +309,39 @@ def slice(file, start, end, output): audio_handler.save_audio(sliced_data, sr, output) click.echo(f"Audio sliced and saved to {output}") + # Register the audio group cli.add_command(audio) + @cli.command() @click.option("--input-dir", type=click.Path(exists=True), required=True) -@click.option("--transform", type=str, required=True, help="Transform to apply (e.g., fft, wavelet).") +@click.option( + "--transform", + type=str, + required=True, + help="Transform to apply (e.g., fft, wavelet).", +) @click.option("--output-dir", type=click.Path(), required=True) def batch(input_dir, transform, output_dir): """Process multiple files in a directory.""" batch_processor.process_batch(input_dir, output_dir, transform) click.echo(f"Batch processing completed. Results saved in {output_dir}") + @cli.command() @click.argument("file", type=click.Path(exists=True)) -@click.option("--type", type=click.Choice(["fft", "spectrogram", "waveform"]), required=True) +@click.option( + "--type", type=click.Choice(["fft", "spectrogram", "waveform"]), required=True +) @click.option("--output", type=click.Path(), required=True) @click.option("--min_freq", type=click.Path(), required=False) @click.option("--max_freq", type=click.Path(), required=False) -@click.option("--extra-params", type=str, help="Additional parameters for visualization libraries in key=value format (comma-separated).") +@click.option( + "--extra-params", + type=str, + help="Additional parameters for visualization libraries in key=value format (comma-separated).", +) def visualize(file, type, output, min_freq, max_freq, extra_params): """Generate visualizations like spectrograms or FFT plots.""" params = {} @@ -267,9 +349,11 @@ def visualize(file, type, output, min_freq, max_freq, extra_params): # Parse key=value pairs for param in extra_params.split(","): key, value = param.split("=") - params[key.strip()] = eval(value.strip()) # Safely evaluate numeric or tuple values + params[key.strip()] = eval( + value.strip() + ) # Safely evaluate numeric or tuple values - if file.endswith(('.wav', '.mp3')): + if file.endswith((".wav", ".mp3")): data, sr = audio_handler.load_audio(file) else: data = data_handler.read_data(file).values.flatten() @@ -286,8 +370,11 @@ def visualize(file, type, output, min_freq, max_freq, extra_params): click.echo(f"{type.capitalize()} visualization saved to {output}") + @cli.command() -@click.option("--list", "list_plugins", is_flag=True, help="List all available plugins.") +@click.option( + "--list", "list_plugins", is_flag=True, help="List all available plugins." +) @click.argument("plugin", required=False) @click.argument("file", type=click.Path(exists=True), required=False) @click.option("--output", type=click.Path(), help="Output file for plugin result.") @@ -311,9 +398,15 @@ def plugin(list_plugins, plugin, file, output): else: raise click.UsageError("Specify --list to view plugins or a plugin to execute.") + @cli.command() @click.argument("expression", type=str) -@click.option("--x-range", type=str, required=True, help="Range for x as start,end,step (e.g., 0,1,0.01).") +@click.option( + "--x-range", + type=str, + required=True, + help="Range for x as start,end,step (e.g., 0,1,0.01).", +) @click.option("--output", type=click.Path(), required=True) def math(expression, x_range, output): """Evaluate a custom mathematical expression.""" diff --git a/sygnals/core/__init__.py b/sygnals/core/__init__.py index 7941290..f484885 100644 --- a/sygnals/core/__init__.py +++ b/sygnals/core/__init__.py @@ -8,5 +8,5 @@ "plugin_manager", "batch_processor", "custom_exec", - "storage" + "storage", ] diff --git a/sygnals/core/audio_handler.py b/sygnals/core/audio_handler.py index cb5e674..a7f402b 100644 --- a/sygnals/core/audio_handler.py +++ b/sygnals/core/audio_handler.py @@ -9,18 +9,22 @@ def load_audio(file_path, sr=None): data, sample_rate = librosa.load(file_path, sr=sr) return data, sample_rate + def save_audio(data, sr, output_path): """Save audio data to a WAV file.""" sf.write(output_path, data, sr) + def save_audio_as_csv(dataframe, output_path): """Save audio data to a CSV file.""" dataframe.to_csv(output_path, index=False) + def save_audio_as_json(dataframe, output_path): """Save audio data to a JSON file.""" dataframe.to_json(output_path, orient="records", indent=2) + # Audio metrics def get_audio_metrics(data, sr): """Calculate audio metrics.""" @@ -31,9 +35,10 @@ def get_audio_metrics(data, sr): return { "rms": float(rms), "peak_amplitude": float(peak_amplitude), - "duration (seconds)": float(duration) + "duration (seconds)": float(duration), } + # Audio slicing def slice_audio(data, sr, start_time, end_time): """Extract a portion of audio data between start_time and end_time.""" @@ -41,15 +46,18 @@ def slice_audio(data, sr, start_time, end_time): end_sample = int(end_time * sr) return data[start_sample:end_sample] + # Audio effects def time_stretch(data, rate): """Stretch audio in time (does not affect pitch).""" return librosa.effects.time_stretch(y=data, rate=rate) + def pitch_shift(data, sr, n_steps): """Shift the pitch of audio.""" return librosa.effects.pitch_shift(y=data, sr=sr, n_steps=n_steps) + def dynamic_range_compression(data, threshold=0.1): """Apply simple dynamic range compression to normalize the audio.""" max_amplitude = np.max(np.abs(data)) diff --git a/sygnals/core/custom_exec.py b/sygnals/core/custom_exec.py index a13d933..00fac2a 100644 --- a/sygnals/core/custom_exec.py +++ b/sygnals/core/custom_exec.py @@ -10,9 +10,10 @@ "exp": np.exp, "log": np.log, "sqrt": np.sqrt, - "pi": np.pi + "pi": np.pi, } + def evaluate_expression(expression, variables): """Safely evaluate a custom mathematical expression.""" safe_locals = {**SAFE_GLOBALS, **variables} diff --git a/sygnals/core/data_handler.py b/sygnals/core/data_handler.py index 707d151..db663ca 100644 --- a/sygnals/core/data_handler.py +++ b/sygnals/core/data_handler.py @@ -5,7 +5,8 @@ import pandasql as ps # Supported formats -SUPPORTED_FORMATS = ['csv', 'json'] +SUPPORTED_FORMATS = ["csv", "json"] + def read_data(file_path): """Load data from a file (CSV or JSON) into a Pandas DataFrame.""" @@ -14,33 +15,37 @@ def read_data(file_path): if file_path == "-": return pd.read_csv(sys.stdin) - if ext == '.csv': + if ext == ".csv": return pd.read_csv(file_path) - elif ext == '.json': + elif ext == ".json": return pd.read_json(file_path) else: raise ValueError(f"Unsupported file format: {ext}") + def save_data(data, output_path): """Save a Pandas DataFrame to a CSV or JSON file.""" _, ext = os.path.splitext(output_path) ext = ext.lower() - if ext == '.csv': + if ext == ".csv": data.to_csv(output_path, index=False) - elif ext == '.json': - data.to_json(output_path, orient='records', indent=2) + elif ext == ".json": + data.to_json(output_path, orient="records", indent=2) else: raise ValueError(f"Unsupported output file format: {ext}") + def run_sql_query(data, query): """Execute an SQL query on a Pandas DataFrame.""" return ps.sqldf(query, locals()) + def filter_data(data, filter_expr): """Filter data using a Pandas-style filter expression.""" return data.query(filter_expr) + def normalize(data): """Normalize the data in a DataFrame.""" return (data - data.min()) / (data.max() - data.min()) diff --git a/sygnals/core/dsp.py b/sygnals/core/dsp.py index 47e287e..ff8baac 100644 --- a/sygnals/core/dsp.py +++ b/sygnals/core/dsp.py @@ -7,45 +7,52 @@ def compute_fft(data, fs=1): """Compute FFT and return frequency and magnitude.""" n = len(data) - freqs = np.fft.fftfreq(n, d=1/fs) + freqs = np.fft.fftfreq(n, d=1 / fs) spectrum = fft(data) return freqs, np.abs(spectrum) + def compute_ifft(spectrum): """Compute inverse FFT from a spectrum.""" return ifft(spectrum).real + # Filtering-related functions -def butterworth_filter(data, cutoff, fs, order=5, filter_type='low'): +def butterworth_filter(data, cutoff, fs, order=5, filter_type="low"): """Apply a Butterworth filter.""" nyquist = 0.5 * fs normal_cutoff = cutoff / nyquist b, a = butter(order, normal_cutoff, btype=filter_type, analog=False) return lfilter(b, a, data) + def low_pass_filter(data, cutoff, fs, order=5): """Apply a low-pass Butterworth filter.""" - return butterworth_filter(data, cutoff, fs, order, 'low') + return butterworth_filter(data, cutoff, fs, order, "low") + def high_pass_filter(data, cutoff, fs, order=5): """Apply a high-pass Butterworth filter.""" - return butterworth_filter(data, cutoff, fs, order, 'high') + return butterworth_filter(data, cutoff, fs, order, "high") + def band_pass_filter(data, low_cutoff, high_cutoff, fs, order=5): """Apply a band-pass Butterworth filter.""" nyquist = 0.5 * fs low = low_cutoff / nyquist high = high_cutoff / nyquist - b, a = butter(order, [low, high], btype='band', analog=False) + b, a = butter(order, [low, high], btype="band", analog=False) return lfilter(b, a, data) + # Convolution-related functions def apply_convolution(data, kernel): """Apply convolution using a custom kernel.""" - return fftconvolve(data, kernel, mode='same') + return fftconvolve(data, kernel, mode="same") + # Window functions -def apply_window(data, window_type='hamming'): +def apply_window(data, window_type="hamming"): """Apply a window function to the data.""" window = get_window(window_type, len(data)) return data * window diff --git a/sygnals/core/filters.py b/sygnals/core/filters.py index 3ed53aa..ad970a2 100644 --- a/sygnals/core/filters.py +++ b/sygnals/core/filters.py @@ -3,29 +3,32 @@ # Butterworth Filter -def butterworth_filter(data, cutoff, fs, order=5, filter_type='low'): +def butterworth_filter(data, cutoff, fs, order=5, filter_type="low"): """Apply a Butterworth filter.""" nyquist = 0.5 * fs normal_cutoff = cutoff / nyquist b, a = butter(order, normal_cutoff, btype=filter_type, analog=False) return lfilter(b, a, data) + # Chebyshev Filter -def chebyshev_filter(data, cutoff, fs, order=5, ripple=0.05, filter_type='low'): +def chebyshev_filter(data, cutoff, fs, order=5, ripple=0.05, filter_type="low"): """Apply a Chebyshev type I filter.""" nyquist = 0.5 * fs normal_cutoff = cutoff / nyquist b, a = cheby1(order, ripple, normal_cutoff, btype=filter_type, analog=False) return lfilter(b, a, data) + # FIR Filter -def fir_filter(data, num_taps, cutoff, fs, filter_type='low'): +def fir_filter(data, num_taps, cutoff, fs, filter_type="low"): """Design and apply an FIR filter.""" nyquist = 0.5 * fs normal_cutoff = cutoff / nyquist - taps = firwin(num_taps, normal_cutoff, pass_zero=(filter_type == 'low')) + taps = firwin(num_taps, normal_cutoff, pass_zero=(filter_type == "low")) return lfilter(taps, 1.0, data) + def low_pass_filter(data, cutoff, fs, order=5): """Apply a low-pass Butterworth filter.""" nyquist = 0.5 * fs @@ -33,6 +36,7 @@ def low_pass_filter(data, cutoff, fs, order=5): b, a = butter(order, normal_cutoff, btype="low", analog=False) return lfilter(b, a, data) + def high_pass_filter(data, cutoff, fs, order=5): """Apply a high-pass Butterworth filter.""" nyquist = 0.5 * fs diff --git a/sygnals/core/plugin_manager.py b/sygnals/core/plugin_manager.py index 1f39fb5..8ef2a10 100644 --- a/sygnals/core/plugin_manager.py +++ b/sygnals/core/plugin_manager.py @@ -1,7 +1,8 @@ import importlib.util import os -PLUGIN_DIR = './plugins/' +PLUGIN_DIR = "./plugins/" + def discover_plugins(plugin_dir=PLUGIN_DIR): """Discover Python plugins in the plugins directory.""" @@ -10,7 +11,7 @@ def discover_plugins(plugin_dir=PLUGIN_DIR): plugins = {} for filename in os.listdir(plugin_dir): - if filename.endswith('.py'): + if filename.endswith(".py"): module_name = filename[:-3] module_path = os.path.join(plugin_dir, filename) spec = importlib.util.spec_from_file_location(module_name, module_path) @@ -18,10 +19,13 @@ def discover_plugins(plugin_dir=PLUGIN_DIR): spec.loader.exec_module(module) for attr_name in dir(module): attr = getattr(module, attr_name) - if callable(attr) and hasattr(attr, "__plugin__"): # Only consider functions marked as plugins + if callable(attr) and hasattr( + attr, "__plugin__" + ): # Only consider functions marked as plugins plugins[attr_name] = attr return plugins + def register_plugin(func): """Mark a function as a plugin.""" func.__plugin__ = True diff --git a/sygnals/core/storage.py b/sygnals/core/storage.py index 63266e2..40e26a9 100644 --- a/sygnals/core/storage.py +++ b/sygnals/core/storage.py @@ -6,9 +6,10 @@ def save_to_database(data, db_path, table_name): """Save a Pandas DataFrame to an SQLite database.""" conn = sqlite3.connect(db_path) - data.to_sql(table_name, conn, if_exists='replace', index=False) + data.to_sql(table_name, conn, if_exists="replace", index=False) conn.close() + def query_database(db_path, query): """Execute an SQL query on the SQLite database.""" conn = sqlite3.connect(db_path) diff --git a/sygnals/core/transforms.py b/sygnals/core/transforms.py index a982d3c..495b2b2 100644 --- a/sygnals/core/transforms.py +++ b/sygnals/core/transforms.py @@ -8,24 +8,31 @@ def fft(data): """Compute the FFT of a signal.""" return np.fft.fft(data) + def ifft(spectrum): """Compute the inverse FFT.""" return np.fft.ifft(spectrum).real + # Wavelet Transform -def wavelet_transform(data, wavelet='db4', level=3): +def wavelet_transform(data, wavelet="db4", level=3): """Perform Wavelet Transform.""" coeffs = pywt.wavedec(data, wavelet, level=level) return coeffs -def wavelet_reconstruction(coeffs, wavelet='db4'): + +def wavelet_reconstruction(coeffs, wavelet="db4"): """Reconstruct signal from Wavelet coefficients.""" return pywt.waverec(coeffs, wavelet) + # Laplace Transform def laplace_transform(data, s_values): """Compute Laplace Transform numerically.""" - return np.array([np.sum(data * np.exp(-s * np.arange(len(data)))) for s in s_values]) + return np.array( + [np.sum(data * np.exp(-s * np.arange(len(data)))) for s in s_values] + ) + def inverse_laplace_transform(transform, t_values): """Compute inverse Laplace Transform numerically (simplified example).""" diff --git a/sygnals/plugins/custom_filter.py b/sygnals/plugins/custom_filter.py index a2ce777..21c2848 100644 --- a/sygnals/plugins/custom_filter.py +++ b/sygnals/plugins/custom_filter.py @@ -5,4 +5,3 @@ def amplify_signal(data, factor=2): """Amplify the signal values by a given factor.""" return data * factor - diff --git a/sygnals/plugins/example_plugin.py b/sygnals/plugins/example_plugin.py index aa8bfbc..8ec64b7 100644 --- a/sygnals/plugins/example_plugin.py +++ b/sygnals/plugins/example_plugin.py @@ -14,7 +14,8 @@ def custom_filter(data, alpha=0.5): smoothed_data.append(alpha * value + (1 - alpha) * smoothed_data[-1]) return np.array(smoothed_data) + @register_plugin def square_signal(data): """Square the signal values.""" - return data ** 2 + return data**2 diff --git a/sygnals/utils/helpers.py b/sygnals/utils/helpers.py index 8be800f..55c210d 100644 --- a/sygnals/utils/helpers.py +++ b/sygnals/utils/helpers.py @@ -6,11 +6,13 @@ def create_directory(path): if not os.path.exists(path): os.makedirs(path) + def get_file_extension(file_path): """Extract the file extension from a path.""" _, ext = os.path.splitext(file_path) return ext.lower() + def validate_file_format(file_path, supported_formats): """Check if the file has a supported format.""" ext = get_file_extension(file_path) diff --git a/sygnals/utils/visualizations.py b/sygnals/utils/visualizations.py index 942b4c5..5c75290 100644 --- a/sygnals/utils/visualizations.py +++ b/sygnals/utils/visualizations.py @@ -7,15 +7,18 @@ def plot_spectrogram_old(data, sr, output_file): """Generate and save a spectrogram plot.""" f, t, Sxx = spectrogram(data, sr) plt.figure(figsize=(10, 6)) - plt.pcolormesh(t, f, 10 * np.log10(Sxx), shading='gouraud') - plt.ylabel('Frequency [Hz]') - plt.xlabel('Time [sec]') - plt.colorbar(label='Power [dB]') - plt.title('Spectrogram') + plt.pcolormesh(t, f, 10 * np.log10(Sxx), shading="gouraud") + plt.ylabel("Frequency [Hz]") + plt.xlabel("Time [sec]") + plt.colorbar(label="Power [dB]") + plt.title("Spectrogram") plt.savefig(output_file) plt.close() -def plot_spectrogram(data, sr, output_file, f_min=None, f_max=None, window='hann', nperseg=1024): + +def plot_spectrogram( + data, sr, output_file, f_min=None, f_max=None, window="hann", nperseg=1024 +): """ Generate and save a spectrogram plot with configurable frequency range and window parameters. @@ -35,55 +38,61 @@ def plot_spectrogram(data, sr, output_file, f_min=None, f_max=None, window='hann plt.figure(figsize=(10, 6)) # Plot the spectrogram with log scale - plt.pcolormesh(t, f, 10 * np.log10(Sxx + 1e-10), # Add small constant to prevent log(0) - shading='gouraud') + plt.pcolormesh( + t, + f, + 10 * np.log10(Sxx + 1e-10), # Add small constant to prevent log(0) + shading="gouraud", + ) # Set the frequency limits if specified if f_min is not None or f_max is not None: f_min = 0 if f_min is None else f_min - f_max = sr/2 if f_max is None else f_max + f_max = sr / 2 if f_max is None else f_max plt.ylim(f_min, f_max) # Customize the plot appearance - plt.ylabel('Frequency [Hz]') - plt.xlabel('Time [sec]') - plt.colorbar(label='Power [dB]') - plt.title('Spectrogram') + plt.ylabel("Frequency [Hz]") + plt.xlabel("Time [sec]") + plt.colorbar(label="Power [dB]") + plt.title("Spectrogram") # Add grid for better readability - plt.grid(True, alpha=0.3, linestyle='--') + plt.grid(True, alpha=0.3, linestyle="--") # Tight layout to prevent label clipping plt.tight_layout() # Save and close - plt.savefig(output_file, dpi=300, bbox_inches='tight') + plt.savefig(output_file, dpi=300, bbox_inches="tight") plt.close() + def plot_fft(data, sr, output_file): """Generate and save an FFT plot.""" n = len(data) - freqs = np.fft.fftfreq(n, d=1/sr) + freqs = np.fft.fftfreq(n, d=1 / sr) spectrum = np.fft.fft(data) magnitude = np.abs(spectrum) plt.figure(figsize=(10, 6)) - plt.plot(freqs[:n // 2], magnitude[:n // 2]) # Only plot positive frequencies - plt.xlabel('Frequency (Hz)') - plt.ylabel('Magnitude') - plt.title('FFT Spectrum') + plt.plot(freqs[: n // 2], magnitude[: n // 2]) # Only plot positive frequencies + plt.xlabel("Frequency (Hz)") + plt.ylabel("Magnitude") + plt.title("FFT Spectrum") plt.grid() plt.savefig(output_file) plt.close() + def plot_waveform(data, sr, output_file): """Generate and save a waveform plot.""" time = np.arange(len(data)) / sr plt.figure(figsize=(10, 6)) plt.plot(time, data) - plt.xlabel('Time (seconds)') - plt.ylabel('Amplitude') - plt.title('Waveform') + plt.xlabel("Time (seconds)") + plt.ylabel("Amplitude") + plt.title("Waveform") plt.grid() plt.savefig(output_file) plt.close() diff --git a/tests/generate_test_wav_file.py b/tests/generate_test_wav_file.py index feb73fa..560f677 100644 --- a/tests/generate_test_wav_file.py +++ b/tests/generate_test_wav_file.py @@ -10,4 +10,3 @@ # Save as a WAV file sf.write("test_audio.wav", sine_wave, sampling_rate) - diff --git a/tests/test_cli.py b/tests/test_cli.py index 3b530db..4cf3ae8 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -10,13 +10,18 @@ def setUp(self): self.runner = CliRunner() def test_analyze_command(self): - result = self.runner.invoke(cli, ['analyze', 'sample.csv', '--output', 'json']) + result = self.runner.invoke(cli, ["analyze", "sample.csv", "--output", "json"]) self.assertEqual(result.exit_code, 0) def test_transform_command(self): - result = self.runner.invoke(cli, ['transform', 'sample.csv', '--fft', '--output', 'output.csv']) + result = self.runner.invoke( + cli, ["transform", "sample.csv", "--fft", "--output", "output.csv"] + ) self.assertEqual(result.exit_code, 0) def test_filter_command(self): - result = self.runner.invoke(cli, ['filter', 'sample.csv', '--low-pass', '100', '--output', 'filtered.csv']) + result = self.runner.invoke( + cli, + ["filter", "sample.csv", "--low-pass", "100", "--output", "filtered.csv"], + ) self.assertEqual(result.exit_code, 0) diff --git a/tests/test_dsp.py b/tests/test_dsp.py index dbf5c12..bbe0629 100644 --- a/tests/test_dsp.py +++ b/tests/test_dsp.py @@ -2,19 +2,29 @@ import numpy as np -from sygnals.core.dsp import (apply_window, band_pass_filter, compute_fft, - compute_ifft, high_pass_filter, low_pass_filter) +from sygnals.core.dsp import ( + apply_window, + band_pass_filter, + compute_fft, + compute_ifft, + high_pass_filter, + low_pass_filter, +) class TestDSP(unittest.TestCase): def setUp(self): self.sample_rate = 1000 # Sampling rate in Hz - self.signal = np.sin(2 * np.pi * 10 * np.linspace(0, 1, self.sample_rate)) # 10 Hz sine wave + self.signal = np.sin( + 2 * np.pi * 10 * np.linspace(0, 1, self.sample_rate) + ) # 10 Hz sine wave def test_fft(self): freqs, spectrum = compute_fft(self.signal, self.sample_rate) self.assertEqual(len(freqs), len(self.signal)) - self.assertTrue(np.isclose(max(spectrum), self.sample_rate / 2)) # Peak in magnitude + self.assertTrue( + np.isclose(max(spectrum), self.sample_rate / 2) + ) # Peak in magnitude def test_ifft(self): _, spectrum = compute_fft(self.signal, self.sample_rate) @@ -30,7 +40,9 @@ def test_high_pass_filter(self): self.assertEqual(len(filtered_signal), len(self.signal)) def test_band_pass_filter(self): - filtered_signal = band_pass_filter(self.signal, low_cutoff=5, high_cutoff=15, fs=self.sample_rate) + filtered_signal = band_pass_filter( + self.signal, low_cutoff=5, high_cutoff=15, fs=self.sample_rate + ) self.assertEqual(len(filtered_signal), len(self.signal)) def test_apply_window(self): diff --git a/tools/frequencies.py b/tools/frequencies.py index 1e97c6a..4da0bd4 100644 --- a/tools/frequencies.py +++ b/tools/frequencies.py @@ -9,7 +9,7 @@ def process_wav_json(json_data, output_file, sample_rate=44100): """ Processes WAV-like data in JSON format, performs FFT, and saves a frequency spectrum plot to a file. - + Parameters: json_data (str or dict): The JSON data as a string or dictionary. output_file (str): The path to save the frequency spectrum plot. @@ -23,7 +23,7 @@ def process_wav_json(json_data, output_file, sample_rate=44100): # Extract amplitude values amplitudes = [point["amplitude"] for point in data] - + # Perform FFT fft_result = np.fft.fft(amplitudes) frequencies = np.fft.fftfreq(len(amplitudes), 1 / sample_rate) @@ -33,21 +33,22 @@ def process_wav_json(json_data, output_file, sample_rate=44100): # Plot the positive half of the frequency spectrum plt.figure(figsize=(10, 6)) - plt.plot(frequencies[:len(frequencies)//2], magnitude[:len(magnitude)//2]) + plt.plot(frequencies[: len(frequencies) // 2], magnitude[: len(magnitude) // 2]) plt.title("Frequency Spectrum") plt.xlabel("Frequency (Hz)") plt.ylabel("Amplitude") plt.grid(True) - + # Save the plot to the specified file plt.savefig(output_file) plt.close() print(f"Frequency spectrum plot saved to: {output_file}") + if __name__ == "__main__": if len(sys.argv) > 1: data_file = sys.argv[1] - output_file = data_file + '.png' + output_file = data_file + ".png" with open(data_file) as data: json_data = json.load(data) process_wav_json(json_data, output_file, sample_rate=44100)