diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml new file mode 100644 index 0000000..4f9a079 --- /dev/null +++ b/.github/workflows/python-publish.yml @@ -0,0 +1,31 @@ +# This workflow will upload a Python Package using Twine when a release is created +# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries + +name: Upload Python Package + +on: + release: + types: [published] + +jobs: + deploy: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: "3.x" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install setuptools wheel twine + - name: Build and publish + env: + TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} + TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} + run: | + python setup.py sdist bdist_wheel + twine upload dist/* diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml new file mode 100644 index 0000000..bd478e5 --- /dev/null +++ b/.github/workflows/python-tests.yml @@ -0,0 +1,46 @@ +# This workflow will install Python dependencies, run tests and lint with a variety of Python versions +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Python tests + +on: + push: + branches: [master, develop] + pull_request: + branches: [master, develop] + +jobs: + test: + runs-on: ubuntu-20.04 + + strategy: + fail-fast: false + matrix: + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] + + steps: + - uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install flake8 pytest setuptools wheel + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Test with pytest + run: | + pytest + - name: Install + run: | + python setup.py sdist bdist_wheel + pip install dist/pylabtools-*.whl diff --git a/.gitignore b/.gitignore index 6ea1fbd..9987c16 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ main.py +*.log +test.py + __pycache__ build -dist -tin_utility.egg-info -text.txt -py_utility.egg-info \ No newline at end of file +dist \ No newline at end of file diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index c5032ce..0000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,26 +0,0 @@ -# Base image: really weightless -image: python:latest - -stages: - - develop - - production - -develop: - stage: develop - script: python setup.py bdist_wheel - artifacts: - paths: - - dist/ - expire_in: 1 week - only: - - develop - -master: - stage: production - script: - - pip install twine - - python setup.py sdist bdist_wheel - - TWINE_PASSWORD=${CI_JOB_TOKEN} TWINE_USERNAME=gitlab-ci-token python -m twine upload --verbose --repository-url https://gitlab.com/api/v4/projects/${CI_PROJECT_ID}/packages/pypi dist/* - only: - - master - - /^(v)[0-9]*.[0-9]*.[0-9]*$/ \ No newline at end of file diff --git a/README.md b/README.md index e69de29..a7285b9 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,23 @@ +# pylabtools + +`pylabtools` is a Python utility library that provides various helper functions for file and directory manipulation, text formatting, and logging. + +## Installation + +You can install pylabtools using pip: + +```bash +pip install pylabtools +``` + +## Testing + +You can run the unit tests using pytest: + +```bash +python -m unittest discover tests +``` + +## Contributing + +If you find a bug or have an idea for a new feature, please open an issue on the GitHub repository. Pull requests are welcome! \ No newline at end of file diff --git a/py_utility/__init__.py b/py_utility/__init__.py deleted file mode 100644 index 51b5161..0000000 --- a/py_utility/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .file import * -from .log import * -from .path import * \ No newline at end of file diff --git a/py_utility/file.py b/py_utility/file.py deleted file mode 100644 index 5a5b94d..0000000 --- a/py_utility/file.py +++ /dev/null @@ -1,35 +0,0 @@ -import codecs -import json -import pathlib - -def read_file_to_text(path: str,encoding = "utf-8"): - with codecs.open(path, "r", encoding= encoding) as f: - text = f.read() - return text - -def write_text(path_file="output.txt", content="", encoding="utf-8"): - with codecs.open(path_file, "w", encoding=encoding) as f: - f.write(content) - -def read_file_config(path_config: str): - """ - return dict of config - """ - with codecs.open(path_config, "r", encoding="utf-8") as f: - config = f.read() - preprocess_config = json.loads(config) - return preprocess_config - -def get_file_name_without_extension(path_file: str): - return pathlib.Path(path_file).stem - -def get_file_name(path_file: str, tail="", without_extension=False, set_extension=None): - """ - for rename filename from path or filename - """ - if set_extension == None: - set_extension = pathlib.Path(path_file).suffix - if without_extension: - return pathlib.Path(path_file).stem+"%s" % (tail) - return pathlib.Path(path_file).stem+"%s%s" % (tail, set_extension) - diff --git a/py_utility/log.py b/py_utility/log.py deleted file mode 100644 index cfe425f..0000000 --- a/py_utility/log.py +++ /dev/null @@ -1,26 +0,0 @@ -import logging - -def set_logfile(path_file: str, mode='a', encoding='utf-8'): - """ - กำหนดค่าเริ่มสำหรับ log ที่ต้องการเขียน - """ - root_logger = logging.getLogger() - root_logger.setLevel(logging.DEBUG) - handler = logging.FileHandler(path_file, mode, encoding) - handler.setFormatter(logging.Formatter( - '%(asctime)s(%(levelname)s) - %(message)s')) - root_logger.addHandler(handler) - -def log_level(level: int, message: str, type=logging.INFO): - "log message โดยจะ indent ตาม level" - message = str(message) - space = "\t"*(level-1) - log_message = space+message - if type == logging.INFO: - logging.info(log_message) - elif type == logging.ERROR: - logging.error(log_message) - elif type == logging.WARNING: - logging.warning(log_message) - else: - logging.warning(log_message) \ No newline at end of file diff --git a/py_utility/path.py b/py_utility/path.py deleted file mode 100644 index 082cd40..0000000 --- a/py_utility/path.py +++ /dev/null @@ -1,41 +0,0 @@ -import os - -def get_all_files(path_input: str, endswith=".txt"): - """ - อ่านทุกๆไฟล์ที่อยู่ใน path ที่กำหนด(อ่านเข้าไปในโฟลเดอร์ย่อยด้วย) - """ - if os.path.isfile(path_input): - return [path_input] - buffer_files = [] - for root, dirs, files in os.walk(path_input): - for file in files: - if file.endswith(endswith): - buffer_files.append(os.path.join(root, file)) - return buffer_files - - -def get_current_folder_name(path_file: str): - if os.path.isfile(path_file): - return os.path.basename(os.path.dirname(path_file)) - return os.path.basename((os.path.abspath(path_file))) - - -def get_previous_path(dir: str, previous=1): - if previous < 1: - return dir - dir = os.path.abspath(dir) - for i in range(previous): - state_dir = dir - dir = os.path.dirname(dir) - if state_dir == dir: - break - return dir - -def get_all_directory(path_input): - """ - get all directory in your path input - return list of path is directory - """ - path_input = os.path.abspath(path_input) - return [os.path.join(path_input, name) for name in os.listdir(path_input) - if os.path.isdir(os.path.join(path_input, name))] \ No newline at end of file diff --git a/pylabtools/__init__.py b/pylabtools/__init__.py new file mode 100644 index 0000000..e97c81c --- /dev/null +++ b/pylabtools/__init__.py @@ -0,0 +1,8 @@ +import os + +files = os.listdir(__path__[0]) +modules = ( + x.replace(".py", "") for x in files if x.endswith(".py") and not x.startswith("__") +) +for module in modules: + __import__("pylabtools." + module) \ No newline at end of file diff --git a/pylabtools/file_wrapper.py b/pylabtools/file_wrapper.py new file mode 100644 index 0000000..93066a1 --- /dev/null +++ b/pylabtools/file_wrapper.py @@ -0,0 +1,89 @@ +import json +from pathlib import Path +from typing import Generator + +def read_file_all_text(path: str, encoding: str = "utf-8") -> str: + """ + Reads the entire content of a file and returns it as a single string. + + Args: + path (str): The path to the file. + encoding (str, optional): The encoding of the file. Defaults to "utf-8". + + Returns: + str: The entire content of the file. + """ + with open(path, "r", encoding=encoding) as f: + return f.read() + +def stream_file_by_line(path: str, encoding: str = "utf-8") -> Generator[str, None, None]: + """ + Streams the content of a file line by line. + + Args: + path (str): The path to the file. + encoding (str, optional): The encoding of the file. Defaults to "utf-8". + + Yields: + Generator[str, None, None]: Each line from the file. + """ + with open(path, "r", encoding=encoding) as f: + for line in f: + yield line.rstrip('\r\n') + + +def write_text_to_file(path: str, content: str, encoding: str = "utf-8") -> None: + """Write text to file + + Args: + path (str): Path to the output file. + content (str): Content to write. + encoding (str, optional): Encoding of the file. Defaults to "utf-8". + """ + with open(path, "w", encoding=encoding) as f: + f.write(content) + + +def read_json_config(path: str) -> dict: + """Read JSON config file + + Args: + path (str): Path to the config file. + + Returns: + dict: Configuration as a dictionary. + """ + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + +def get_file_name_without_extension(path: str) -> str: + """Get file name without extension + + Args: + path (str): Path to the file. + + Returns: + str: File name without extension. + """ + return Path(path).stem + + +def get_file_name(path: str, tail: str = "", set_extension: str = None, without_extension: bool = False) -> str: + """Get file name with optional modifications + + Args: + path (str): Path to the file. + tail (str, optional): Additional tail for the file name. Defaults to "". + set_extension (str, optional): Desired file extension. If None, uses original extension. Defaults to None. + without_extension (bool, optional): If True, returns file name without extension. Defaults to False. + + Returns: + str: Modified file name. + """ + file_name = Path(path).stem + if without_extension: + return file_name + tail + if not set_extension: + set_extension = Path(path).suffix + return file_name + tail + set_extension diff --git a/pylabtools/log_wropper.py b/pylabtools/log_wropper.py new file mode 100644 index 0000000..ba2a899 --- /dev/null +++ b/pylabtools/log_wropper.py @@ -0,0 +1,87 @@ +import logging +from typing import Optional, Union +from pathlib import Path +import json + +class JSONFormatter(logging.Formatter): + """ + A custom formatter for logging messages as JSON. + + Methods: + format: Returns the log message formatted as a JSON string. + """ + + def format(self, record: logging.LogRecord) -> str: + """Return the log message formatted as JSON.""" + log_data = { + 'timestamp': record.created, + 'level': record.levelname, + 'message': record.getMessage(), + 'module': record.module, + 'line': record.lineno, + 'funcName': record.funcName, + 'pathname': record.pathname + } + return json.dumps(log_data, ensure_ascii=False) + +class LoggerSetup: + """ + A class to configure a logger with options for JSON formatted logging to file and console. + + Methods: + add_file_handler: Adds a file handler to the logger. + add_console_handler: Adds a console handler to the logger. + set_custom_format: Sets a custom formatter for logging messages. + get_logger: Returns the configured logger. + """ + + def __init__(self, log_level: int = logging.DEBUG): + """Initialize the LoggerSetup with the desired log level. + + Examples: + >>> logger_setup = LoggerSetup(log_level=logging.DEBUG) + >>> custom_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + >>> logger_setup.set_custom_format(custom_format) + >>> logger_setup.add_file_handler("logfile.log") + >>> logger_setup.add_console_handler() + >>> logger = logger_setup.get_logger() + >>> logger.debug("This is a debug message.") + """ + self.logger = logging.getLogger() + self.logger.setLevel(log_level) + self.formatter = JSONFormatter() + + def add_file_handler(self, log_file_path: Optional[Path] = None, mode: str = 'a', encoding: str = 'utf-8') -> None: + """ + Add a file handler to the logger. + + Args: + log_file_path (Path): The path to the log file. Defaults to None. + mode (str): File mode. Defaults to 'a'. + encoding (str): File encoding. Defaults to 'utf-8'. + """ + if log_file_path: + file_handler = logging.FileHandler(log_file_path, mode=mode, encoding=encoding) + file_handler.setFormatter(self.formatter) + self.logger.addHandler(file_handler) + + def add_console_handler(self) -> None: + """Add a console handler to the logger.""" + console_handler = logging.StreamHandler() + console_handler.setFormatter(self.formatter) + self.logger.addHandler(console_handler) + + def set_custom_format(self, fmt: Union[logging.Formatter, None]) -> None: + """ + Set a custom formatter for logging messages. + + Args: + fmt (logging.Formatter | None): The desired logging formatter. + """ + for handler in self.logger.handlers: + handler.setFormatter(fmt) + self.formatter = fmt + + def get_logger(self) -> logging.Logger: + """Return the configured logger.""" + return self.logger diff --git a/pylabtools/minio_wrapper.py b/pylabtools/minio_wrapper.py new file mode 100644 index 0000000..694c78c --- /dev/null +++ b/pylabtools/minio_wrapper.py @@ -0,0 +1,167 @@ +from minio import Minio +from tqdm import tqdm +import os +from multiprocessing import Pool, cpu_count +from typing import List, Tuple +import logging +from concurrent.futures import ThreadPoolExecutor + +class MinioWrapper: + """ + Wrapper class for MinIO operations. + + Attributes: + - minio_client (Minio): The Minio client object. + - endpoint (str): Minio server endpoint. + - access_key (str): Access key for Minio server. + - secret_key (str): Secret key for Minio server. + """ + def __init__(self, endpoint: str, access_key: str = None, secret_key: str = None, secure: bool = True): + self.minio_client = Minio(endpoint, access_key=access_key, secret_key=secret_key, secure=secure) + logging.info(f"Minio client created for endpoint: {endpoint}") + self.endpoint = endpoint + self.access_key = access_key + self.secret_key = secret_key + + @staticmethod + def get_all_file_paths(directory: str) -> Tuple[List[str], List[str]]: + """ + Retrieves the absolute paths and relative paths of all files in the given directory. + + Parameters: + - directory (str): The directory whose files' paths are to be retrieved. + + Returns: + - Tuple[List[str], List[str]]: A tuple containing two lists: + 1. A list of absolute file paths. + 2. A list of relative file paths from the input directory. + """ + directory = os.path.abspath(directory) + abspath_files = [os.path.join(root, filename) + for root, _, files in os.walk(directory) for filename in files] + relative_paths = [file[len(directory)+1:] for file in abspath_files] + return abspath_files, relative_paths + + @staticmethod + def upload_file(args: Tuple[str, str, str, str, str, str]) -> str: + """ + Uploads a single file to a MinIO bucket. + + Parameters: + - args (Tuple[str, str, str, str, str, str]): A tuple containing the following: + 1. Minio server endpoint. + 2. Access key for Minio server. + 3. Secret key for Minio server. + 4. Target Minio bucket name. + 5. Local path of the file to be uploaded. + 6. Remote path (including filename) where the file will be stored in the bucket. + + Returns: + - str: A string indicating the success status ("Success") or the error message. + """ + minio_endpoint, minio_access_key, minio_secret_key, bucket_name, local_path, remote_path = args + minio_client = Minio(minio_endpoint, minio_access_key, minio_secret_key) + try: + minio_client.fput_object(bucket_name, remote_path, local_path) + return "Success" + except Exception as err: + return f"Upload Error for {local_path} : {err}" + + def upload(self, bucket_name: str, path_local_upload: str, prefix: str = "") -> None: + """ + Uploads files or directories to a specified MinIO bucket. + + If the provided path represents a directory, all files within it are uploaded with their + relative paths maintained in the bucket. If the path represents a single file, only that file + is uploaded. The upload leverages multiprocessing for enhanced speed. + + Parameters: + - bucket_name (str): The target Minio bucket where the files/directories will be uploaded. + - path_local_upload (str): The local path of the file or directory to be uploaded. + - prefix (str, optional): The prefix or folder name within the bucket where the files will be uploaded. Defaults to "". + + Returns: + - None: Files are uploaded to the MinIO bucket and no explicit return value is provided. + + Raises: + - Exceptions related to file upload will be logged. + + Example: + >>> client = MinioWrapper(endpoint="localhost:9000", access_key="YOUR_ACCESS_KEY", secret_key="YOUR_SECRET_KEY") + >>> client.upload(bucket_name="mybucket", path_local_upload="/path/to/local/data", prefix="remote/folder/") + """ + path_local_upload = os.path.abspath(path_local_upload) + upload_args = [] + + if os.path.isdir(path_local_upload): + prefix = os.path.join(prefix, os.path.basename(path_local_upload)) + files = MinioWrapper.get_all_file_paths(path_local_upload) + upload_args = [(self.endpoint, self.access_key, self.secret_key, bucket_name, local_file, os.path.join(prefix, remote_file).replace("\\", "/")) + for local_file, remote_file in zip(files[0], files[1])] + else: + remote_path = os.path.join(prefix, os.path.basename(path_local_upload)).replace("\\", "/") + upload_args.append((self.endpoint, self.access_key, self.secret_key, bucket_name, path_local_upload, remote_path)) + + # Use multiprocessing for the uploads + with Pool(processes=cpu_count()) as pool: + results = list(tqdm(pool.imap_unordered(MinioWrapper.upload_file, upload_args), total=len(upload_args), desc="Files Uploaded", unit="file")) + + # Handle and display errors + errors = [result for result in results if result != "Success"] + for error in errors: + logging.error(error) + + + def download_files(self, bucket_name, prefix="", recursive=False, destination_path="", max_workers=10): + """ + Download all files from the specified bucket with optional prefix and recursion. + + Args: + - bucket_name (str): Name of the bucket in minio. + - prefix (str, optional): Prefix or folder name within the bucket. Defaults to "". + - recursive (bool, optional): Whether or not to download files recursively. Defaults to False. + - destination_path (str, optional): Local directory where the files will be downloaded to. Defaults to the current directory. + - max_workers (int, optional): Maximum number of threads to use for concurrent downloads. Defaults to 10. + """ + + os.makedirs(destination_path, exist_ok=True) + objects_to_download = [obj.object_name for obj in self.minio_client.list_objects(bucket_name, prefix=prefix, recursive=recursive)] + + def _download_to_dest(bucket_name, object_name, destination, pbar=None): + dest_file_path = os.path.join(destination, object_name) + os.makedirs(os.path.dirname(dest_file_path), exist_ok=True) + self.minio_client.fget_object(bucket_name, object_name, dest_file_path) + if pbar: + pbar.update(1) + + with tqdm(total=len(objects_to_download), desc="Downloading files", unit="file") as pbar: + with ThreadPoolExecutor(max_workers=max_workers) as executor: + executor.map(_download_to_dest, [bucket_name]*len(objects_to_download), objects_to_download, [destination_path]*len(objects_to_download), [pbar]*len(objects_to_download)) + + def download_file(self, bucket_name: str, file_name: str, file_output: str = None) -> None: + """ + Downloads a specific file from the given MinIO bucket. + + Args: + - bucket_name (str): The name of the bucket in MinIO from which the file needs to be downloaded. + - file_name (str): The name (or path) of the file within the bucket to download. + - file_output (str, optional): The desired local name (or path) for the downloaded file. + If not provided, the file will be saved with its original name from the bucket. + + Returns: + - None: The function saves the downloaded file to the local filesystem and does not return any value. + + Raises: + - S3Error: If there is an issue related to the S3 operation, e.g., a file or bucket does not exist. + - ResponseError: If there is a network-related error during the call. + + Example: + >>> client = MinioClient(endpoint="localhost:9000", access_key="YOUR_ACCESS_KEY", secret_key="YOUR_SECRET_KEY") + >>> client.download_file(bucket_name="mybucket", file_name="data.txt", file_output="local_data.txt") + """ + + + file_output_name = file_name + if file_output is not None: + file_output_name = file_output + self.minio_client.fget_object(bucket_name, file_name, file_output_name) \ No newline at end of file diff --git a/pylabtools/path_wrapper.py b/pylabtools/path_wrapper.py new file mode 100644 index 0000000..6206e8c --- /dev/null +++ b/pylabtools/path_wrapper.py @@ -0,0 +1,78 @@ +import os +from typing import List + +def get_all_files(path_input: str, endswith: str = None, recursive: bool = True) -> List[str]: + """ + Get all files from the specified path. + + Args: + - path_input (str): Path to start the search from. + - endswith (str, optional): File extension filter. If None, returns all files. + - recursive (bool, optional): If True, search for files recursively. Defaults to True. + + Returns: + - List[str]: List of file paths. + """ + if os.path.isfile(path_input): + return [path_input] + + if recursive: + files = [ + os.path.join(root, file) + for root, _, files in os.walk(path_input) + for file in files + if endswith is None or file.endswith(endswith) + ] + else: + files = [ + os.path.join(path_input, file) + for file in os.listdir(path_input) + if os.path.isfile(os.path.join(path_input, file)) and (endswith is None or file.endswith(endswith)) + ] + + return files + +def get_current_folder_name(path: str) -> str: + """ + Get the name of the folder containing the specified path. + + Args: + - path (str): Path to a file or folder. + + Returns: + - str: Name of the containing folder. + """ + return os.path.basename(os.path.dirname(os.path.abspath(path))) + +def get_previous_path(path_input: str, previous: int = 1) -> str: + """ + Returns the path `previous` directories above `path_input`. + + Args: + - path_input (str): The starting path. + - previous (int, optional): The number of directories to go up. Defaults to 1. + + Returns: + - str: The resulting path. + """ + path = os.path.abspath(path_input) + for _ in range(previous): + path = os.path.dirname(path) + return path + +def get_all_directories(path_input: str) -> List[str]: + """ + Get all directories from the specified path. + + Args: + - path_input (str): Path to start the search from. + + Returns: + - List[str]: List of directory paths. + """ + path_input = os.path.abspath(path_input) + return [ + os.path.join(path_input, name) + for name in os.listdir(path_input) + if os.path.isdir(os.path.join(path_input, name)) + ] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c5828ff --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +minio==7.1.15 +tqdm==4.64.1 \ No newline at end of file diff --git a/setup.py b/setup.py index 8a45542..868136b 100644 --- a/setup.py +++ b/setup.py @@ -1,20 +1,45 @@ import setuptools +import subprocess +import os + +remote_version = ( + subprocess.run(["git", "describe", "--tags"], stdout=subprocess.PIPE) + .stdout.decode("utf-8") + .strip() +) + +if "-" in remote_version: + # when not on tag, git describe outputs: "1.3.3-22-gdf81228" + # pip has gotten strict with version numbers + # so change it to: "1.3.3+22.git.gdf81228" + # See: https://peps.python.org/pep-0440/#local-version-segments + v,i,s = remote_version.split("-") + remote_version = v + "+" + i + ".git." + s + +assert "-" not in remote_version +assert "." in remote_version with open("README.md", "r") as f: long_description = f.read() setuptools.setup( - name="py_utility", - version="1.0.0", + name="pylabtools", + version=remote_version, author="Tinnawong saelao", author_email="tinnawong2010@hotmail.com", - description="python utility", + description="python utility for research", long_description=long_description, long_description_content_type="text/markdown", + url="https://github.com/tinnawong/pylabtools", packages=setuptools.find_packages(), + package_data={"pylabtools": ["VERSION"]}, + include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", "Operating System :: OS Independent", ], - install_requires= [], -) + install_requires= [ + "minio==7.1.15", + "tqdm==4.64.1" + ], +) \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_file_wrapper.py b/tests/test_file_wrapper.py new file mode 100644 index 0000000..222bcbd --- /dev/null +++ b/tests/test_file_wrapper.py @@ -0,0 +1,50 @@ +import unittest +from unittest.mock import mock_open, patch +from pylabtools import file_wrapper as fw + +class TestFileFunctions(unittest.TestCase): + + def test_read_file_all_text(self): + m = mock_open(read_data="test content") + with patch("builtins.open", m): + content = fw.read_file_all_text("fakepath.txt") + self.assertEqual(content, "test content") + + def test_stream_file_by_line(self): + m = mock_open(read_data="line1\nline2\nline3") + with patch("builtins.open", m): + lines = list(fw.stream_file_by_line("fakepath.txt")) + self.assertEqual(lines, ["line1", "line2", "line3"]) + + def test_write_text_to_file(self): + m = mock_open() + with patch("builtins.open", m): + fw.write_text_to_file("fakepath.txt", "test content") + m.assert_called_once_with("fakepath.txt", "w", encoding="utf-8") + handle = m() + handle.write.assert_called_once_with("test content") + + def test_read_json_config(self): + mock_json_content = '{"key": "value"}' + m = mock_open(read_data=mock_json_content) + with patch("builtins.open", m), patch("json.load", return_value={"key": "value"}) as mock_json: + result = fw.read_json_config("fakepath.json") + self.assertEqual(result, {"key": "value"}) + mock_json.assert_called_once() + + def test_get_file_name_without_extension(self): + result = fw.get_file_name_without_extension("directory/filename.extension") + self.assertEqual(result, "filename") + + def test_get_file_name(self): + result = fw.get_file_name("directory/filename.extension", tail="_tail", set_extension=".newext") + self.assertEqual(result, "filename_tail.newext") + + result_no_ext = fw.get_file_name("directory/filename.extension", without_extension=True) + self.assertEqual(result_no_ext, "filename") + + result_original_ext = fw.get_file_name("directory/filename.extension", tail="_tail") + self.assertEqual(result_original_ext, "filename_tail.extension") + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_log_wrapper.py b/tests/test_log_wrapper.py new file mode 100644 index 0000000..1abdf13 --- /dev/null +++ b/tests/test_log_wrapper.py @@ -0,0 +1,49 @@ +import unittest +from unittest.mock import patch, mock_open +import logging +from pylabtools import log_wropper as lw + +class TestLoggerSetup(unittest.TestCase): + + def setUp(self): + # Clear existing handlers + logging.getLogger().handlers = [] + + # Initialize a logger setup instance before each test + self.logger_setup = lw.LoggerSetup() + + def test_initialization(self): + self.assertEqual(self.logger_setup.logger.level, logging.DEBUG) + self.assertIsInstance(self.logger_setup.formatter, lw.JSONFormatter) + + @patch('logging.FileHandler._open', mock_open()) + def test_add_file_handler(self): + self.logger_setup.add_file_handler(log_file_path="test.log") + self.assertEqual(len(self.logger_setup.logger.handlers), 1) + self.assertIsInstance(self.logger_setup.logger.handlers[0], logging.FileHandler) + + @patch.object(logging.StreamHandler, 'emit') + def test_add_console_handler(self, mock_emit): + self.logger_setup.add_console_handler() + log_message = "This is a test message." + logger = self.logger_setup.get_logger() + logger.info(log_message) # Make sure to log a message! + + mock_emit.assert_called() + # The emit method takes a LogRecord as argument. So, you should access its `msg` attribute. + self.assertIn(log_message, mock_emit.call_args[0][0].msg) + + def test_set_custom_format(self): + custom_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + self.logger_setup.add_console_handler() + self.logger_setup.set_custom_format(custom_format) + + for handler in self.logger_setup.logger.handlers: + self.assertEqual(handler.formatter, custom_format) + + def test_get_logger(self): + logger = self.logger_setup.get_logger() + self.assertEqual(logger, self.logger_setup.logger) + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_path_wrapper.py b/tests/test_path_wrapper.py new file mode 100644 index 0000000..8184d25 --- /dev/null +++ b/tests/test_path_wrapper.py @@ -0,0 +1,50 @@ +import os +import tempfile +import unittest +from pylabtools import path_wrapper as pw +class TestFileOperations(unittest.TestCase): + + def setUp(self): + # Creating a temporary directory for testing + self.test_dir = tempfile.TemporaryDirectory() + + # Test file and directory + self.test_file_path = os.path.join(self.test_dir.name, "test.txt") + with open(self.test_file_path, 'w') as f: + f.write("test content") + + self.test_subdir = os.path.join(self.test_dir.name, "subdir") + os.makedirs(self.test_subdir) + + def tearDown(self): + # Cleaning up the temporary directory + self.test_dir.cleanup() + + def test_get_all_files(self): + # Recursive search + files = pw.get_all_files(self.test_dir.name, recursive=True) + self.assertIn(self.test_file_path, files) + + # Non-recursive search + files = pw.get_all_files(self.test_dir.name, recursive=False) + self.assertIn(self.test_file_path, files) + self.assertNotIn(self.test_subdir, files) + + # Filter by extension + files = pw.get_all_files(self.test_dir.name, endswith=".txt", recursive=False) + self.assertIn(self.test_file_path, files) + + def test_get_current_folder_name(self): + folder_name = pw.get_current_folder_name(self.test_file_path) + self.assertEqual(folder_name, os.path.basename(self.test_dir.name)) + + def test_get_previous_path(self): + prev_path = pw.get_previous_path(self.test_file_path) + self.assertEqual(prev_path, self.test_dir.name) + + def test_get_all_directories(self): + dirs = pw.get_all_directories(self.test_dir.name) + self.assertIn(self.test_subdir, dirs) + +if __name__ == "__main__": + unittest.main() diff --git a/utils.py b/utils.py deleted file mode 100644 index 790409c..0000000 --- a/utils.py +++ /dev/null @@ -1,135 +0,0 @@ -import logging -import os -import re -import codecs -import json -import pathlib - - - -def read_file_to_text(path: str): - with codecs.open(path, "r", encoding='utf-8') as f: - text = f.read() - return text - - -def write_text(path_file="output.txt", content="", encoding="utf-8"): - with codecs.open(path_file, "w", encoding=encoding) as f: - f.write(content) - - -def get_all_files(path_input: str, endswith=".txt"): - """ - อ่านทุกๆไฟล์ที่อยู่ใน path ที่กำหนด(อ่านเข้าไปในโฟลเดอร์ย่อยด้วย) - """ - if os.path.isfile(path_input): - return [path_input] - buffer_files = [] - for root, dirs, files in os.walk(path_input): - for file in files: - if file.endswith(endswith): - buffer_files.append(os.path.join(root, file)) - return buffer_files - - -def normalize_text_transcription(text: str): - """ - ตัดคำที่มีวงเล็ปออก - """ - regex = r"\[\S+]|\([ก-ฮ][\S]+\)" - pattern = re.findall(regex, text) - print(pattern) - text_clearn = re.sub(regex, "", text) - return text_clearn - - -def normalize_transcription(path_input: str, path_output: str): - list_path = get_all_files(path_input) - text_removed_buffer = [] - for p in list_path: - print(p) - text = read_file_to_text(p) - text_removed = normalize_text_transcription(text) - text_removed_buffer.append(text_removed) - total_text = "\n".join(text_removed_buffer) - with codecs.open(os.path.join(path_output, "transcription.txt"), "w", encoding="utf-8")as f: - f.write(total_text) - - -def read_file_config(path_config: str): - with codecs.open(path_config, "r", encoding="utf-8") as f: - config = f.read() - preprocess_config = json.loads(config) - return preprocess_config - - -def get_file_name(path_file: str, tail="", without_extension=False, set_extension=None): - """ - สำหรับ - extension - """ - if without_extension and set_extension: - log_level( - 1, "parameter without_extension is true but set extension", logging.WARNING) - if set_extension == None: - set_extension = pathlib.Path(path_file).suffix - if without_extension: - return pathlib.Path(path_file).stem+"%s" % (tail) - return pathlib.Path(path_file).stem+"%s%s" % (tail, set_extension) - - -def get_current_folder_name(path_file: str): - if os.path.isfile(path_file): - return os.path.basename(os.path.dirname(path_file)) - return os.path.basename((os.path.abspath(path_file))) - - -def get_previous_path(dir: str, previous=1): - if previous < 1: - return dir - dir = os.path.abspath(dir) - for i in range(previous): - state_dir = dir - dir = os.path.dirname(dir) - if state_dir == dir: - break - return dir - - -def search_word_in_all_files(path_corpus: str, word: str): - list_file = get_all_files(path_corpus) - for p in list_file: - text = read_file_to_text(p) - if text.find(word) != -1: - print(p) - - -def delete_all_files(path_dir: str, isConfirm=True): - list_path = get_all_files(path_dir) - for p in list_path: - print("path file:", p) - print("total files :",len(list_path)) - if isConfirm: - ch = input("Do you want to delete all files? (y/n):") - if ch == "y": - for p in list_path: - if os.path.exists(p): - os.remove(p) - else: - print("The file does not exist :", p) - else: - for p in list_path: - if os.path.exists(p): - os.remove(p) - else: - print("The file does not exist :", p) - - -if __name__ == '__main__': - # path_corpus = "transcription" - # path_output = "./corpus/transcription/" - # normalize_transcription(path_corpus,path_output) - path_file = "D:/python/ngram_corrector/corpus/train and test/Corpus 5 Million/novel/novel_00107.ss.txt" - log_level(1, "sdfkjsdkf") - log_level(2, "sdfkjsdkf") - print(get_file_name(path_file, tail="_sdfkit", without_extension=True)) diff --git a/utils_test.py b/utils_test.py deleted file mode 100644 index 1cd8125..0000000 --- a/utils_test.py +++ /dev/null @@ -1,28 +0,0 @@ -import unittest -from unittest.case import TestCase -from py_utility import * - - -class TestTinUtility(unittest.TestCase): - def test_get_file_name(self): - path_test = "Corpus 5 Million/novel/novel.txt" - cases = [ - # parameter : tail="", without_extension=False, set extension=None - ["",False,None,"novel.txt"],#000 - ["",False,".csv","novel.csv"],#001 - ["",True,None,"novel"],#010 - ["",True,".csv","novel"],#011 - ["_test",False,None,"novel_test.txt"],#100 - ["_test",False,".csv","novel_test.csv"],#101 - ["_test",True,None,"novel_test"],#110 - ["_test",True,".csv","novel_test"],#111 - ] - for i,case in enumerate(cases): - file_name = get_file_name_without_extension(path_test,tail=case[0],without_extension=case[1],set_extension=case[2]) - try: - self.assertEqual(case[3],file_name) - except: - print("case {} :{} != {}".format(i+1,case[3],file_name)) - -if __name__ == '__main__': - unittest.main() \ No newline at end of file