Skip to content

Commit

Permalink
Merge pull request #31 from troykelly/29-copy-output-file-if-linking-…
Browse files Browse the repository at this point in the history
…fails

29 copy output file if linking fails
  • Loading branch information
troykelly authored May 21, 2024
2 parents e162c8f + d2554d3 commit cc386ed
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 36 deletions.
6 changes: 5 additions & 1 deletion lexicon.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
{
"direct": {
"direct_sensitive": {
"abc": "a b c",
"123": "one two three",
"NSW": "New South Wales",
"Anthony Albanese": "Anthony /ɑɫbɑˈneɪzi/"
},
"direct_insensitive": {
"quasar": "ˈkweɪzɑɹ",
"quasars": "ˈkweɪzɑɹz"
},
"regex": {
"(?P<number>\\d+\\.?\\d*)°C": "\\g<number> degrees Celsius",
"(?P<number>\\d+\\.?\\d*)°F": "\\g<number> degrees Fahrenheit",
Expand Down
107 changes: 72 additions & 35 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,20 @@ def validate_cron(cron_expr):
except:
return False

def check_and_create_link_path(link_path):
"""Check if the symbolic link path is writable and create the directory if it doesn't exist."""
def check_and_create_link_path(source, link_path):
"""Attempt to create a symbolic link, fallback to copying if linking fails.
Args:
source (str): The source file path.
link_path (str): The target symbolic link path.
Returns:
bool: True if either linking or copying succeeds, False otherwise.
"""
source = Path(source)
link_path = Path(link_path)
directory = link_path.parent

if not directory.exists():
try:
directory.mkdir(parents=True, exist_ok=True)
Expand All @@ -120,13 +130,28 @@ def check_and_create_link_path(link_path):
logging.error(f"Failed to create directory for link path '{directory}': {e}")
return False

# Check if we can write to the directory
if not os.access(directory, os.W_OK):
logging.error(f"Cannot write to the directory '{directory}' for link path.")
return False

return True
# Attempt to create a symbolic link
try:
if link_path.exists() or link_path.is_symlink():
link_path.unlink()
link_path.symlink_to(source)
logging.info(f"Created symbolic link '{link_path}' -> '{source}'")
return True
except Exception as e:
logging.warning(f"Failed to create symbolic link '{link_path}' -> '{source}': {e}")

# Fallback to copying the file
try:
if link_path.exists():
link_path.unlink()
import shutil
shutil.copy2(source, link_path)
logging.info(f"Copied '{source}' to '{link_path}'")
return True
except Exception as e:
logging.error(f"Failed to copy '{source}' to '{link_path}': {e}")
return False

def load_lexicon(file_path):
"""Load lexicon dictionary from a JSON file.
Expand All @@ -138,8 +163,7 @@ def load_lexicon(file_path):
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
lexicon = json.load(file)
return lexicon
return json.load(file)
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Error loading lexicon file '{file_path}': {e}")
return {}
Expand All @@ -154,13 +178,25 @@ def apply_lexicon(text, lexicon):
Returns:
str: The text after applying lexicon conversions.
"""
# Apply direct text to translation mappings
for original, translation in lexicon.get("direct", {}).items():
import re

# Sort and apply case-sensitive direct text to translation mappings (prioritize longer matches)
direct_sensitive = sorted(lexicon.get("direct_sensitive", {}).items(), key=lambda x: -len(x[0]))
for original, translation in direct_sensitive:
text = text.replace(original, translation)

# Sort and apply case-insensitive direct text to translation mappings (prioritize longer matches)
direct_insensitive = sorted(lexicon.get("direct_insensitive", {}).items(), key=lambda x: -len(x[0]))
for original, translation in direct_insensitive:
pattern = re.compile(re.escape(original), re.IGNORECASE)
text = pattern.sub(lambda m: translation, text)

# Apply regex patterns with named groups
for pattern, translation in lexicon.get("regex", {}).items():
text = re.sub(pattern, translation, text)
try:
text = re.sub(pattern, translation, text)
except re.error as e:
logging.error(f"Regex error with pattern '{pattern}': {e}")

return text

Expand All @@ -178,18 +214,19 @@ def process_text_for_tts(text):
return apply_lexicon(text, lexicon)

def parse_rss_feed(feed_url, config):
"""Parses the RSS feed, filtering out ignored articles using regex patterns.
"""Parses the RSS feed, filtering out ignored articles using regex patterns
and sorts the articles from most recent to oldest.
Args:
feed_url (str): URL of the RSS feed.
config (dict): Configuration dictionary for field mappings and ignore patterns.
Returns:
list: List of dictionaries, each representing a news item.
list: List of dictionaries, each representing a news item, sorted by most recent.
"""
feed = feedparser.parse(feed_url, agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36')
parsed_items = []

for entry in feed.entries:
logging.info(f"Processing entry: {entry.title}")
if any(re.search(pattern, entry.title) for pattern in config['IGNORE_PATTERNS']):
Expand All @@ -200,22 +237,25 @@ def parse_rss_feed(feed_url, config):
for config_field, feed_field in config.items()
if config_field not in ['IGNORE_PATTERNS', 'CATEGORY']}
item['CATEGORY'] = ', '.join(categories) if categories else 'General'

# Ensure we correctly parse the published date
if hasattr(entry, 'published_parsed'):
published_date = datetime(*entry.published_parsed[:6])
item['PUBLISHED'] = published_date
elif 'published' in entry:
try:
published_date = datetime.strptime(entry.published, '%a, %d %b %Y %H:%M:%S %Z')
item['PUBLISHED'] = published_date
except ValueError:
published_date = 'Unknown'
item['PUBLISHED'] = None
else:
published_date = 'Unknown'

item['PUBLISHED'] = published_date

item['PUBLISHED'] = None

parsed_items.append(item)

# Sort the parsed items by the published date in descending order
parsed_items.sort(key=lambda x: x['PUBLISHED'] or datetime.min, reverse=True)

return parsed_items

def fetch_bom_data(product_id):
Expand Down Expand Up @@ -586,12 +626,15 @@ def generate_speech(news_script_chunk, api_key, voice, quality, output_format):
str: Path to the temporary file containing the generated audio.
"""
client = OpenAI(api_key=api_key)

processed_text = process_text_for_tts(news_script_chunk)
logging.info(f"Processed text for TTS: {processed_text}")

try:
response = client.audio.speech.create(
model=quality,
voice=voice,
input=news_script_chunk,
input=processed_text,
response_format=output_format
)

Expand Down Expand Up @@ -850,7 +893,7 @@ def generate_news_audio():

if sfx_file:
if current_index + 1 < len(script_sections):
speech_text = process_text_for_tts(script_sections[current_index + 1])
speech_text = script_sections[current_index + 1]
speech_audio_file = generate_speech(speech_text, openai_api_key, tts_voice, tts_quality, output_format)
mixed_audio = generate_mixed_audio(sfx_file, speech_audio_file, timing_value)
final_audio += mixed_audio
Expand All @@ -859,14 +902,14 @@ def generate_news_audio():

if sfx_key == "OUTRO" and article_end_time is None:
article_end_time = total_duration
logging.info(f"Music bed to start at {article_end_time}.")
logging.info(f"Music bed to end at {article_end_time}.")

if sfx_key == "FIRST" and article_start_time is None:
timing_offset = 0
if timing_value.lower() != "none":
timing_offset = int(timing_value)
article_start_time = total_duration + timing_offset
logging.info(f"Music bed to end at {article_start_time}.")
logging.info(f"Music bed to start at {article_start_time}.")

total_duration += len(mixed_audio)
else:
Expand Down Expand Up @@ -935,16 +978,10 @@ def generate_news_audio():
output_link = os.getenv('NEWS_READER_OUTPUT_LINK', '').strip()

if output_link:
if check_and_create_link_path(output_link):
try:
if os.path.exists(output_link):
os.remove(output_link)
os.symlink(output_file_path, output_link)
logging.info(f"Created symbolic link '{output_link}' -> '{output_file_path}'")
except Exception as e:
logging.error(f"Failed to create symbolic link '{output_link}' to '{output_file_path}': {e}")
if check_and_create_link_path(output_file_path, output_link):
logging.info(f"Successfully created link or copied file to '{output_link}'")
else:
logging.error(f"Failed checks for symbolic link creation: {output_link}")
logging.error(f"Failed to create link or copy file to '{output_link}'")

def main():
"""Main function that fetches, parses, and processes the RSS feed into audio."""
Expand Down

0 comments on commit cc386ed

Please sign in to comment.