Skip to content

Commit

Permalink
add map name user
Browse files Browse the repository at this point in the history
  • Loading branch information
mrzaizai2k committed Sep 17, 2024
1 parent 272d7a9 commit fde2848
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 22 deletions.
3 changes: 2 additions & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,5 @@ excel:
excel_file_path: data/user_template.xlsm
sheet_name: April 24_0
nachname_key: 'nachname'
vorname_key: 'vorname'
vorname_key: 'vorname'
name_thresh: 20
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ pymongo
uvicorn
pytesseract
pydantic
openpyxl
openpyxl
rank_bm25
fuzzywuzzy
145 changes: 136 additions & 9 deletions src/excel_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,29 @@
sys.path.append("")

import openpyxl
from fuzzywuzzy import fuzz, process

from src.Utils.utils import read_config

class ExcelProcessor:
def __init__(self, config_path):
def __init__(self, config_path:str = None, config:dict = None):
# Load configuration from the YAML file
self.config_path = config_path
self.config = read_config(path=self.config_path)['excel']

if (config_path is None and config is None) or (config_path and config):
raise ValueError("Either 'config_path' or 'config' must be provided, but not both or neither.")

if config_path:
self.config_path = config_path
self.config = read_config(path=self.config_path)['excel']
elif config:
self.config = config['excel']

# Load the Excel file and sheet
self.workbook = openpyxl.load_workbook(self.config['excel_file_path'], keep_vba=True)
self.sheet = self.workbook[self.config['sheet_name']]
self.nachname_position = None
self.vorname_position = None

def find_positions(self):
def _find_nachname_vorname_positions(self):
"""Find the row and column positions of 'nachname' and 'vorname'."""
nachname_key = self.config['nachname_key'].lower()
vorname_key = self.config['vorname_key'].lower()
Expand All @@ -41,9 +49,9 @@ def find_positions(self):
if self.nachname_position[1] >= self.vorname_position[1]:
raise ValueError(f"'{nachname_key}' must be in a column less than '{vorname_key}'.")

def get_user_name(self):
def get_user_names(self):
"""Returns a list of tuples (nachname, vorname) starting from the row after the header."""
self.find_positions() # Ensure the positions are set
self._find_nachname_vorname_positions() # Ensure the positions are set

row_num = self.nachname_position[0] + 1
values_nachname_and_vorname = []
Expand All @@ -65,13 +73,132 @@ def get_user_name(self):

return values_nachname_and_vorname

def get_user_email(self, nachname: str, vorname: str) -> str:
"""
Generate the user email based on the first two characters of the last and first names.
:param nachname: Last name of the user.
:param vorname: First name of the user.
:return: Email address as a string.
"""
# Get the first two characters from each name, ensuring they are not None
nachname_part = nachname[:2].lower() if nachname else ''
vorname_part = vorname[:2].lower() if vorname else ''
return f"{nachname_part}{vorname_part}@gmail.com"

def get_user_info(self):
"""
Returns a list of dictionaries, each containing user information:
{'name': (nachname, vorname), 'email': email}
"""
# Get the list of user names
user_names = self.get_user_names()
user_info_list = []

# Generate user info with names and emails
for nachname, vorname in user_names:
email = self.get_user_email(nachname, vorname)
user_info_list.append({
'name': (nachname, vorname),
'email': email
})

return user_info_list

def get_sheet_names(self):
return self.workbook.sheetnames



class FuzzyNameMatcher:
def __init__(self, names):
"""
Initialize the matcher with the list of names.
:param names: List of tuples (nachname, vorname)
"""
self.names = names
# Preprocess names into canonical form and create a mapping for unique names
self.canonical_names = self._preprocess_names()

def _preprocess_names(self):
"""
Preprocess the names by creating both first-last and last-first formats.
:return: List of unique preprocessed names as strings.
"""
corpus = []
for idx, (last_name, first_name) in enumerate(self.names):
last_first = f"{last_name.lower()} {first_name.lower()}"
first_last = f"{first_name.lower()} {last_name.lower()}"
# Store both last-first and first-last with the index of the name
corpus.append((last_first, idx))
corpus.append((first_last, idx))
return corpus

def find_best_match(self, ocr_output):
"""
Find the closest match to the OCR output using fuzzy matching.
:param ocr_output: OCR string (possibly incorrect)
:return: Position of best matching name in the original list, best matching name, and highest similarity score.
"""
# Preprocess the OCR output
ocr_output = ocr_output.lower()

# Extract the closest match using fuzzy matching (search over both last-first and first-last formats)
best_match, best_score = None, 0
best_idx = None

for name, idx in self.canonical_names:
score = fuzz.ratio(ocr_output, name)
if score > best_score:
best_match = name
best_score = score
best_idx = idx

# Return the index of the original name in the list, the best match, and the score
if best_match is not None:
original_name = self.names[best_idx]
return best_idx, original_name, best_score
else:
return None, None, 0

def get_full_name(name_tuple):
"""
Convert a tuple into a full name string.
:param name_tuple: Tuple containing parts of a name (e.g., last_name, first_name, etc.)
:return: Full name as a single string with space-separated values.
"""
return ' '.join(f"{part}" for part in name_tuple)




if __name__ == "__main__":
# Example usage:
processor = ExcelProcessor('config/config.yaml')
names = processor.get_user_name()
config_path = 'config/config.yaml'
config = read_config(config_path)
processor = ExcelProcessor(config=config)
names = processor.get_user_names()

print(names)
user_info = processor.get_user_info()
for info in user_info:
print(info)

print(processor.get_sheet_names())

# Example usage:

matcher = FuzzyNameMatcher(names)

# OCR output (potentially with errors and mixed order)
ocr_output = ["Téuuley Divl", "Tuuulev Dirk", "Tümmeler Dirk", "Dirk Tuuulev", "Divl Téuuley"]

for ocr_name in ocr_output:
best_idx, best_match, best_score = matcher.find_best_match(ocr_name)
print(f"OCR Output: {ocr_name}")
if best_match:
print(f"Best Match: {best_match} at index {best_idx} with score {best_score}")
full_name = get_full_name(best_match)
print(full_name)
else:
print("No match found")

14 changes: 8 additions & 6 deletions src/invoice_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ def extract_invoice_info(base64_img:str, ocr_reader:OcrReader, invoice_extractor
invoice_info = invoice_extractor.extract_invoice(ocr_text=ocr_result['text'], image=rotate_image,
invoice_template=invoice_template)
print(invoice_info)
invoice_info = validate_invoice(invoice_info, invoice_type)
invoice_info = validate_invoice(invoice_info=invoice_info,
invoice_type=invoice_type, config=config)

result['translator'] = ocr_reader['translator']
result['ocr_detector'] = ocr_reader['ocr_detector']
result['invoice_info'] = invoice_info
Expand All @@ -128,19 +130,19 @@ def extract_invoice_info(base64_img:str, ocr_reader:OcrReader, invoice_extractor



def validate_invoice(invoice_info:dict, invoice_type:str) ->dict:
def validate_invoice(invoice_info:dict, invoice_type:str, config:dict) ->dict:

if invoice_type == "invoice 1":
valid_invoice = validate_invoice_1(invoice_info)
valid_invoice = validate_invoice_1(invoice_data=invoice_info, config=config)
full_invoice = Invoice1(invoice_info=valid_invoice['invoice_info'])


elif invoice_type == "invoice 2":
valid_invoice = validate_invoice_2(invoice_info)
valid_invoice = validate_invoice_2(invoice_data=invoice_info, config=config)
full_invoice = Invoice2(invoice_info=valid_invoice['invoice_info'])

elif invoice_type == "invoice 3":
valid_invoice = validate_invoice_3(invoice_info)
valid_invoice = validate_invoice_3(invoice_data=invoice_info)
full_invoice = Invoice3(invoice_info=valid_invoice['invoice_info'])

full_invoice_dict = full_invoice.model_dump(exclude_unset=False)
Expand All @@ -153,7 +155,7 @@ def validate_invoice(invoice_info:dict, invoice_type:str) ->dict:

ocr_reader = OcrReader(config_path=config_path, translator=GoogleTranslator())
invoice_extractor = OpenAIExtractor(config_path=config_path)
img_path = "test/images/009_2.png"
img_path = "test/images/005_1.png"
base64_img = convert_img_path_to_base64(img_path)
result = extract_invoice_info(base64_img=base64_img, ocr_reader=ocr_reader,
invoice_extractor=invoice_extractor, config=config)
Expand Down
52 changes: 47 additions & 5 deletions src/validate_invoice.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,42 @@
from pydantic import BaseModel, Field, model_validator
from typing import List, Optional, Any, Union
from datetime import date, time, datetime


from src.excel_export import FuzzyNameMatcher, ExcelProcessor, get_full_name


def preprocess_name(name: str) -> str:
"""
Preprocess the OCR name by removing special symbols like ,./! and converting to lowercase.
:param name: OCR name string.
:return: Preprocessed name.
"""
# Remove special characters and strip whitespace
return re.sub(r'[^\w\s]', '', name).lower().strip()

def map_name(ocr_name: str, config: dict):
"""
Map OCR name to the closest match from the Excel data.
:param ocr_name: String from OCR output.
:param config: Configuration dictionary.
:return: Best matching full name or an empty string if no match is found.
"""
# Preprocess the OCR name before matching
ocr_name = preprocess_name(ocr_name)

# Initialize the processor and matcher
processor = ExcelProcessor(config=config)
user_names = processor.get_user_names()

matcher = FuzzyNameMatcher(user_names)
best_idx, best_match, best_score = matcher.find_best_match(ocr_name)

# Check if the match score is above the defined threshold
if best_score >= config['excel']['name_thresh']:
full_name = get_full_name(best_match)
return full_name
else:
return ""

def strip_strings(value):
if isinstance(value, str):
return value.strip()
Expand Down Expand Up @@ -98,7 +132,7 @@ def normalize_float(value):
except (ValueError, TypeError):
return None

def validate_invoice_1(invoice_data: dict) -> dict:
def validate_invoice_1(invoice_data: dict, config:dict) -> dict:

# Recursive function to apply normalizations and validations to the data
def validate_and_normalize(data: Any, reference_year=None):
Expand All @@ -115,6 +149,9 @@ def validate_and_normalize(data: Any, reference_year=None):

if 'break_time' in key:
data[key] = normalize_float(data[key])

if key == 'name':
data[key] = map_name(value, config)

# Recursively normalize nested dictionaries or lists
if isinstance(value, dict) or isinstance(value, list):
Expand All @@ -128,7 +165,8 @@ def validate_and_normalize(data: Any, reference_year=None):
# Call the validation and normalization function
return validate_and_normalize(invoice_data)

def validate_invoice_2(invoice_data: dict) -> dict:

def validate_invoice_2(invoice_data: dict, config:dict) -> dict:

# Function to normalize titles in fixed lines
def normalize_title(title: str) -> str:
Expand Down Expand Up @@ -171,6 +209,10 @@ def validate_and_normalize(data: Any, reference_year=None):
if 'amount' in key:
data[key] = normalize_float(data[key])

# If the key is 'name', apply map_name to standardize it
if key == 'name':
data[key] = map_name(value, config)

# If the key is 'fixed_lines', normalize the title and payment_method
if key == 'fixed_lines':
for line in data[key]:
Expand Down Expand Up @@ -453,7 +495,7 @@ class Invoice3(BaseModel):
}
json_2 = {
"invoice_info": {
"name": "Schmidt, Timo",
"name": "Tüümler, Dirk",
"project_number": "V123023",
"is_in_egw": True,
"currency": "EUR",
Expand Down

0 comments on commit fde2848

Please sign in to comment.