-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfile_utils.py
112 lines (93 loc) · 3.48 KB
/
file_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# file_utils.py
import csv
import json
import os
import logging
logger = logging.getLogger(__name__)
def save_config(config_file, config_data):
"""
Save the configuration data to a JSON file.
Args:
config_file (str): Path to the configuration file.
config_data (dict): Dictionary containing configuration data.
"""
try:
with open(config_file, "w") as f:
json.dump(config_data, f, indent=4)
logger.info(f"Configuration saved to {config_file}.")
except Exception as e:
logger.error(f"Failed to save configuration: {e}")
def ensure_directories_exist():
"""
Ensures necessary directories for the application exist.
"""
os.makedirs("logs", exist_ok=True)
os.makedirs("ANKI", exist_ok=True)
os.makedirs("input_files", exist_ok=True)
def get_default_input_files():
"""
List all input files in the 'input_files/' directory.
"""
input_dir = os.path.join(os.getcwd(), "input_files")
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
logger.debug(f"Found input files: {files}")
return files
def parse_input_file(input_file):
"""
Parse the input file and return a list of rows.
Handles Markdown pipe tables and skips non-table lines.
"""
rows = []
try:
with open(input_file, "r", encoding="utf-8") as f:
lines = f.readlines()
headers = None
for line in lines:
line = line.strip()
if not line or "|" not in line:
continue
if headers is None:
headers = [h.strip() for h in line.split("|") if h.strip()]
if not headers:
raise ValueError("No valid headers found in the input file.")
continue
if "---" in line:
continue
values = [v.strip() for v in line.split("|")[1:-1]]
if len(values) == len(headers):
rows.append(dict(zip(headers, values)))
else:
logger.warning(f"Mismatched row length: {values}")
except Exception as e:
logger.error(f"Error parsing input file: {e}")
return rows
def validate_input_file(input_file):
"""
Validate if the input file exists, is readable, and contains valid Markdown table formatting.
Args:
input_file (str): Path to the input file.
Returns:
bool: True if the file is valid, False otherwise.
"""
if not os.path.exists(input_file):
raise FileNotFoundError(f"Input file {input_file} does not exist.")
if not os.access(input_file, os.R_OK):
raise PermissionError(f"Input file {input_file} is not readable.")
# Check for valid Markdown table formatting
with open(input_file, "r", encoding="utf-8") as file:
lines = file.readlines()
if len(lines) < 2:
# A valid table needs at least a header and one row
return False
headers = lines[0].strip()
separator = lines[1].strip()
# Validate header and separator
if "|" not in headers or "---" not in separator:
return False
# Ensure rows have consistent pipe-separated values
header_columns = [col.strip() for col in headers.split("|") if col.strip()]
for line in lines[2:]:
row_columns = [col.strip() for col in line.split("|") if col.strip()]
if len(row_columns) != len(header_columns):
return False
return True