diff --git a/backend/editor/graph_db.py b/backend/editor/graph_db.py index af6c9f40..3246e76a 100644 --- a/backend/editor/graph_db.py +++ b/backend/editor/graph_db.py @@ -12,6 +12,7 @@ from .exceptions import TransactionMissingError log = logging.getLogger(__name__) +DEFAULT_DB = "neo4j" txn = contextvars.ContextVar("txn") @@ -29,7 +30,7 @@ async def TransactionCtx(): """ global txn, session try: - async with driver.session() as _session: + async with driver.session(database=DEFAULT_DB) as _session: txn_manager = await _session.begin_transaction() async with txn_manager as _txn: txn.set(_txn) @@ -86,5 +87,5 @@ def SyncTransactionCtx(): """ uri = settings.uri driver = neo4j.GraphDatabase.driver(uri) - with driver.session() as _session: + with driver.session(database=DEFAULT_DB) as _session: yield _session diff --git a/parser/openfoodfacts_taxonomy_parser/parser/logger.py b/parser/openfoodfacts_taxonomy_parser/parser/logger.py index 555c5bb8..881a4761 100644 --- a/parser/openfoodfacts_taxonomy_parser/parser/logger.py +++ b/parser/openfoodfacts_taxonomy_parser/parser/logger.py @@ -2,6 +2,11 @@ class ParserConsoleLogger: + @staticmethod + def ellipsis(text, max=20): + """Cut a text adding eventual ellipsis if we do not display it fully""" + return text[:max] + ("..." if len(text) > max else "") + def __init__(self): self.parsing_warnings = [] # Stores all warning logs self.parsing_errors = [] # Stores all error logs diff --git a/parser/openfoodfacts_taxonomy_parser/parser/parser.py b/parser/openfoodfacts_taxonomy_parser/parser/parser.py index d0af686d..a2c97255 100644 --- a/parser/openfoodfacts_taxonomy_parser/parser/parser.py +++ b/parser/openfoodfacts_taxonomy_parser/parser/parser.py @@ -1,26 +1,24 @@ +import collections import logging import os import sys +import timeit import iso639 -from neo4j import GraphDatabase, Session +from neo4j import GraphDatabase, Session, Transaction from .logger import ParserConsoleLogger from ..normalizer import normalizing from .taxonomy_parser import ( NodeType, PreviousLink, + Taxonomy, TaxonomyParser, NodeData, ChildLink, ) -def ellipsis(text, max=20): - """Cut a text adding eventual ellipsis if we do not display it fully""" - return text[:max] + ("..." if len(text) > max else "") - - class Parser: """Parse a taxonomy file and build a neo4j graph""" @@ -28,131 +26,195 @@ def __init__(self, session: Session): self.session = session self.parser_logger = ParserConsoleLogger() - def _create_headernode(self, header: list[str], multi_label: str): - """Create the node for the header""" - query = f""" - CREATE (n:{multi_label}:TEXT) - SET n.id = '__header__' - SET n.preceding_lines= $header - SET n.src_position= 1 - """ - self.session.run(query, header=header) + def _get_project_name(self, taxonomy_name: str, branch_name: str): + """Create a project name for given branch and taxonomy""" + return "p_" + taxonomy_name + "_" + branch_name - def _create_node(self, node_data: NodeData, multi_label: str): - """Run the query to create the node with data dictionary""" + def _create_other_node(self, tx: Transaction, node_data: NodeData, project_label: str): + """Create a TEXT, SYNONYMS or STOPWORDS node""" position_query = """ SET n.id = $id - SET n.is_before = $is_before SET n.preceding_lines = $preceding_lines SET n.src_position = $src_position """ - entry_query = "" if node_data.get_node_type() == NodeType.TEXT: - id_query = f" CREATE (n:{multi_label}:TEXT) \n " + id_query = f"CREATE (n:{project_label}:TEXT) \n" elif node_data.get_node_type() == NodeType.SYNONYMS: - id_query = f" CREATE (n:{multi_label}:SYNONYMS) \n " + id_query = f"CREATE (n:{project_label}:SYNONYMS) \n" elif node_data.get_node_type() == NodeType.STOPWORDS: - id_query = f" CREATE (n:{multi_label}:STOPWORDS) \n " + id_query = f"CREATE (n:{project_label}:STOPWORDS) \n" else: - id_query = f" CREATE (n:{multi_label}:ENTRY) \n " - position_query += " SET n.main_language = $main_language " - if node_data.parent_tag: - entry_query += " SET n.parents = $parent_tag \n" - for key in node_data.properties: - if key.startswith("prop_"): - entry_query += " SET n." + key + " = $" + key + "\n" - - for key in node_data.tags: - if key.startswith("tags_"): - entry_query += " SET n." + key + " = $" + key + "\n" + raise ValueError(f"ENTRY nodes should not be passed to this function") + + entry_queries = [f"SET n.{key} = ${key}" for key in node_data.tags] + entry_query = "\n".join(entry_queries) + "\n" query = id_query + entry_query + position_query - self.session.run(query, node_data.to_dict()) + tx.run(query, node_data.to_dict()) - def _get_project_name(self, taxonomy_name: str, branch_name: str): - """Create a project name for given branch and taxonomy""" - return "p_" + taxonomy_name + "_" + branch_name + def _create_other_nodes(self, other_nodes: list[NodeData], project_label: str): + """Create all TEXT, SYNONYMS and STOPWORDS nodes""" + self.parser_logger.info("Creating TEXT, SYNONYMS and STOPWORDS nodes") + start_time = timeit.default_timer() - def _create_multi_label(self, taxonomy_name: str, branch_name: str) -> str: - """Create a combined label with taxonomy name and branch name""" - project_name = self._get_project_name(taxonomy_name, branch_name) - return project_name + ":" + ("t_" + taxonomy_name) + ":" + ("b_" + branch_name) - - def create_nodes(self, nodes: list[NodeData], multi_label: str): - """Adding nodes to database""" - self.parser_logger.info("Creating nodes") - for node in nodes: - if node.id == "__header__": - self._create_headernode(node.preceding_lines, multi_label) - else: - self._create_node(node, multi_label) + with self.session.begin_transaction() as tx: + for node in other_nodes: + self._create_other_node(tx, node, project_label) + + self.parser_logger.info( + f"Created {len(other_nodes)} TEXT, SYNONYMS and STOPWORDS nodes in {timeit.default_timer() - start_time} seconds" + ) + + def _create_entry_nodes(self, entry_nodes: list[NodeData], project_label: str): + """Create all ENTRY nodes in a single batch query""" + self.parser_logger.info("Creating ENTRY nodes") + start_time = timeit.default_timer() + + base_query = f""" + WITH $entry_nodes as entry_nodes + UNWIND entry_nodes as entry_node + CREATE (n:{project_label}:ENTRY) + SET n.id = entry_node.id + SET n.preceding_lines = entry_node.preceding_lines + SET n.src_position = entry_node.src_position + SET n.main_language = entry_node.main_language + """ + + # we don't know in advance which properties and tags + # we will encounter in the batch + # so we accumulate them in this set + seen_properties_and_tags = set() + + for entry_node in entry_nodes: + if entry_node.get_node_type() != NodeType.ENTRY: + raise ValueError(f"Only ENTRY nodes should be passed to this function") + seen_properties_and_tags.update(entry_node.tags) + seen_properties_and_tags.update(entry_node.properties) - def create_previous_link(self, previous_links: list[PreviousLink], multi_label: str): + additional_query = "\n" + "\n".join( + [f"SET n.{key} = entry_node.{key}" for key in seen_properties_and_tags] + ) + + query = base_query + additional_query + self.session.run(query, entry_nodes=[entry_node.to_dict() for entry_node in entry_nodes]) + + self.parser_logger.info( + f"Created {len(entry_nodes)} ENTRY nodes in {timeit.default_timer() - start_time} seconds" + ) + + def _create_previous_links(self, previous_links: list[PreviousLink], project_label: str): + """Create the 'is_before' relations between nodes""" self.parser_logger.info("Creating 'is_before' links") - for previous_link in previous_links: - id = previous_link["id"] - before_id = previous_link["before_id"] - - query = f""" - MATCH(n:{multi_label}) WHERE n.id = $id - MATCH(p:{multi_label}) WHERE p.id= $before_id - CREATE (p)-[r:is_before]->(n) - RETURN r - """ - results = self.session.run(query, id=id, before_id=before_id) - relation = results.values() - if len(relation) > 1: - self.parser_logger.error( - "2 or more 'is_before' links created for ids %s and %s, " - "one of the ids isn't unique", - id, - before_id, - ) - elif not relation[0]: - self.parser_logger.error("link not created between %s and %s", id, before_id) - - def create_child_link(self, child_links: list[ChildLink], multi_label: str): - """Create the relations between nodes""" + start_time = timeit.default_timer() + + # The previous links creation is batched in a single query + # We also use the ID index to speed up the MATCH queries + query = f""" + UNWIND $previous_links as previous_link + MATCH(n:{project_label}) USING INDEX n:{project_label}(id) + WHERE n.id = previous_link.id + MATCH(p:{project_label}) USING INDEX p:{project_label}(id) + WHERE p.id = previous_link.before_id + CREATE (p)-[relations:is_before]->(n) + WITH relations + UNWIND relations AS relation + RETURN COUNT(relation) + """ + result = self.session.run(query, previous_links=previous_links) + count = result.value()[0] + + self.parser_logger.info( + f"Created {count} 'is_before' links in {timeit.default_timer() - start_time} seconds" + ) + + if count != len(previous_links): + self.parser_logger.error( + f"Created {count} 'is_before' links, {len(previous_links)} links expected" + ) + + def _create_child_links( + self, child_links: list[ChildLink], entry_nodes: list[NodeData], project_label: str + ): + """Create the 'is_child_of' relations between nodes""" self.parser_logger.info("Creating 'is_child_of' links") + start_time = timeit.default_timer() + + node_ids = set([node.id for node in entry_nodes]) + # we collect nodes with a parent id which is the id of an entry (normalised) + # and nodes where a synonym was used to designate the parent (unnormalised) + normalised_child_links = [] + unnormalised_child_links = [] for child_link in child_links: - child_id = child_link["id"] - parent = child_link["parent_id"] - lc, parent_id = parent.split(":") - query = f""" MATCH (p:{multi_label}:ENTRY) WHERE $parent_id IN p.tags_ids_""" + lc - query += f""" - MATCH (c:{multi_label}) WHERE c.id= $child_id - CREATE (c)-[r:is_child_of]->(p) - RETURN r + if child_link["parent_id"] in node_ids: + normalised_child_links.append(child_link) + else: + unnormalised_child_links.append(child_link) + + # adding normalised links is easy as we can directly match parent entries + normalised_query = f""" + UNWIND $normalised_child_links as child_link + MATCH (p:{project_label}) USING INDEX p:{project_label}(id) + WHERE p.id = child_link.parent_id + MATCH (c:{project_label}) USING INDEX c:{project_label}(id) + WHERE c.id = child_link.id + CREATE (c)-[relations:is_child_of]->(p) + WITH relations + UNWIND relations AS relation + RETURN COUNT(relation) + """ + + # for unnormalised links, we need to group them by language code of the parent id + lc_child_links_map = collections.defaultdict(list) + for child_link in unnormalised_child_links: + lc, parent_id = child_link["parent_id"].split(":") + child_link["parent_id"] = parent_id + lc_child_links_map[lc].append(child_link) + + # we create a query for each language code + lc_queries = [] + for lc, lc_child_links in lc_child_links_map.items(): + lc_query = f""" + UNWIND $lc_child_links as child_link + MATCH (p:{project_label}) + WHERE child_link.parent_id IN p.tags_ids_{lc} + MATCH (c:{project_label}) USING INDEX c:{project_label}(id) + WHERE c.id = child_link.id + CREATE (c)-[relations:is_child_of]->(p) + WITH relations + UNWIND relations AS relation + RETURN COUNT(relation) """ - result = self.session.run(query, parent_id=parent_id, child_id=child_id) - if not result.value(): - self.parser_logger.warning( - f"parent not found for child {child_id} with parent {parent_id}" - ) - - def _create_fulltext_index(self, taxonomy_name: str, branch_name: str): - """Create indexes for search""" - project_name = self._get_project_name(taxonomy_name, branch_name) - query = ( - f"""CREATE FULLTEXT INDEX {project_name+'_SearchIds'} IF NOT EXISTS - FOR (n:{project_name}) ON EACH [n.id]\n""" - + """ - OPTIONS {indexConfig: {`fulltext.analyzer`: 'keyword'}}""" + lc_queries.append((lc_query, lc_child_links)) + + count = 0 + + if normalised_child_links: + normalised_result = self.session.run( + normalised_query, normalised_child_links=normalised_child_links + ) + count += normalised_result.value()[0] + + if lc_queries: + for lc_query, lc_child_links in lc_queries: + lc_result = self.session.run(lc_query, lc_child_links=lc_child_links) + count += lc_result.value()[0] + + self.parser_logger.info( + f"Created {count} 'is_child_of' links in {timeit.default_timer() - start_time} seconds" ) - self.session.run(query) - language_codes = [lang.alpha2 for lang in list(iso639.languages) if lang.alpha2 != ""] - tags_prefixed_lc = ["n.tags_" + lc for lc in language_codes] - tags_prefixed_lc = ", ".join(tags_prefixed_lc) - query = f"""CREATE FULLTEXT INDEX {project_name+'_SearchTags'} IF NOT EXISTS - FOR (n:{project_name}) ON EACH [{tags_prefixed_lc}]""" - self.session.run(query) + if count != len(child_links): + self.parser_logger.error( + f"Created {count} 'is_child_of' links, {len(child_links)} links expected" + ) - def _create_parsing_errors_node(self, taxonomy_name: str, branch_name: str): + def _create_parsing_errors_node(self, taxonomy_name: str, branch_name: str, project_label: str): """Create node to list parsing errors""" - multi_label = self._create_multi_label(taxonomy_name, branch_name) + self.parser_logger.info("Creating 'ERRORS' node") + start_time = timeit.default_timer() + query = f""" - CREATE (n:{multi_label}:ERRORS) + CREATE (n:{project_label}:ERRORS) SET n.id = $project_name SET n.branch_name = $branch_name SET n.taxonomy_name = $taxonomy_name @@ -161,7 +223,7 @@ def _create_parsing_errors_node(self, taxonomy_name: str, branch_name: str): SET n.errors = $errors_list """ params = { - "project_name": self._get_project_name(taxonomy_name, branch_name), + "project_name": project_label, "branch_name": branch_name, "taxonomy_name": taxonomy_name, "warnings_list": self.parser_logger.parsing_warnings, @@ -169,17 +231,67 @@ def _create_parsing_errors_node(self, taxonomy_name: str, branch_name: str): } self.session.run(query, params) + self.parser_logger.info( + f"Created 'ERRORS' node in {timeit.default_timer() - start_time} seconds" + ) + + def _create_node_id_index(self, project_label: str): + """Create index for search query optimization""" + query = f""" + CREATE INDEX {project_label}_id_index IF NOT EXISTS FOR (n:{project_label}) ON (n.id) + """ + self.session.run(query) + + def _create_node_fulltext_index(self, project_label: str): + """Create indexes for text search""" + query = ( + f"""CREATE FULLTEXT INDEX {project_label+'_SearchIds'} IF NOT EXISTS + FOR (n:{project_label}) ON EACH [n.id]\n""" + + """ + OPTIONS {indexConfig: {`fulltext.analyzer`: 'keyword'}}""" + ) + self.session.run(query) + + language_codes = [lang.alpha2 for lang in list(iso639.languages) if lang.alpha2 != ""] + tags_prefixed_lc = ["n.tags_" + lc for lc in language_codes] + tags_prefixed_lc = ", ".join(tags_prefixed_lc) + query = f"""CREATE FULLTEXT INDEX {project_label+'_SearchTags'} IF NOT EXISTS + FOR (n:{project_label}) ON EACH [{tags_prefixed_lc}]""" + self.session.run(query) + + def _create_node_indexes(self, project_label: str): + """Create node indexes""" + self.parser_logger.info("Creating indexes") + start_time = timeit.default_timer() + + self._create_node_id_index(project_label) + self._create_node_fulltext_index(project_label) + + self.parser_logger.info(f"Created indexes in {timeit.default_timer() - start_time} seconds") + + def _write_to_database(self, taxonomy: Taxonomy, taxonomy_name: str, branch_name: str): + project_label = self._get_project_name(taxonomy_name, branch_name) + # First create nodes, then create node indexes to accelerate relationship creation, then create relationships + self._create_other_nodes(taxonomy.other_nodes, project_label) + self._create_entry_nodes(taxonomy.entry_nodes, project_label) + self._create_node_indexes(project_label) + self._create_child_links(taxonomy.child_links, taxonomy.entry_nodes, project_label) + self._create_previous_links(taxonomy.previous_links, project_label) + # Lastly create the parsing errors node + self._create_parsing_errors_node(taxonomy_name, branch_name, project_label) + def __call__(self, filename: str, branch_name: str, taxonomy_name: str): """Process the file""" + start_time = timeit.default_timer() + branch_name = normalizing(branch_name, char="_") - multi_label = self._create_multi_label(taxonomy_name, branch_name) taxonomy_parser = TaxonomyParser() taxonomy = taxonomy_parser.parse_file(filename, self.parser_logger) - self.create_nodes([*taxonomy.entry_nodes, *taxonomy.other_nodes], multi_label) - self.create_child_link(taxonomy.child_links, multi_label) - self.create_previous_link(taxonomy.previous_links, multi_label) - self._create_fulltext_index(taxonomy_name, branch_name) - self._create_parsing_errors_node(taxonomy_name, branch_name) + self._write_to_database(taxonomy, taxonomy_name, branch_name) + + self.parser_logger.info( + f"Finished parsing {taxonomy_name} in {timeit.default_timer() - start_time} seconds" + ) if __name__ == "__main__": @@ -192,7 +304,7 @@ def __call__(self, filename: str, branch_name: str, taxonomy_name: str): # Initialize neo4j uri = os.environ.get("NEO4J_URI", "bolt://localhost:7687") driver = GraphDatabase.driver(uri) - session = driver.session() + session = driver.session(database="neo4j") # Pass session variable to parser object parse = Parser(session) diff --git a/parser/openfoodfacts_taxonomy_parser/parser/taxonomy_parser.py b/parser/openfoodfacts_taxonomy_parser/parser/taxonomy_parser.py index d46a52fa..c864469a 100644 --- a/parser/openfoodfacts_taxonomy_parser/parser/taxonomy_parser.py +++ b/parser/openfoodfacts_taxonomy_parser/parser/taxonomy_parser.py @@ -11,11 +11,6 @@ from ..normalizer import normalizing -def ellipsis(text, max=20): - """Cut a text adding eventual ellipsis if we do not display it fully""" - return text[:max] + ("..." if len(text) > max else "") - - class NodeType(str, Enum): TEXT = "TEXT" SYNONYMS = "SYNONYMS" @@ -37,10 +32,8 @@ class NodeData: def to_dict(self): return { "id": self.id, - "is_before": self.is_before, "main_language": self.main_language, "preceding_lines": self.preceding_lines, - "parent_tag": self.parent_tag, "src_position": self.src_position, **self.properties, **self.tags, @@ -229,11 +222,11 @@ def _harvest_entries(self, filename: str, entries_start_line: int) -> Iterator[N if self._entry_end(line, data): if data.id in saved_nodes: msg = ( - "Entry with same id %s already created, " - "duplicate id in file at line %s. " + f"Entry with same id {data.id} already created, " + f"duplicate id in file at line {data.src_position}. " "Node creation cancelled." ) - self.parser_logger.error(msg, data.id, data.src_position) + self.parser_logger.error(msg) else: data = self._remove_separating_line(data) yield data # another function will use this dictionary to create a node @@ -257,9 +250,7 @@ def _harvest_entries(self, filename: str, entries_start_line: int) -> Iterator[N lc, value = self._get_lc_value(line[10:]) except ValueError: self.parser_logger.error( - "Missing language code at line %d ? '%s'", - line_number + 1, - ellipsis(line), + f"Missing language code at line {line_number + 1} ? '{self.parser_logger.ellipsis(line)}'" ) else: data.tags["tags_" + lc] = value @@ -276,9 +267,7 @@ def _harvest_entries(self, filename: str, entries_start_line: int) -> Iterator[N lc, value = self._get_lc_value(line) except ValueError: self.parser_logger.error( - "Missing language code at line %d ? '%s'", - line_number + 1, - ellipsis(line), + f"Missing language code at line {line_number + 1} ? '{self.parser_logger.ellipsis(line)}'" ) else: data.tags["tags_" + lc] = tags @@ -313,9 +302,7 @@ def _harvest_entries(self, filename: str, entries_start_line: int) -> Iterator[N property_name, lc, property_value = line.split(":", 2) except ValueError: self.parser_logger.error( - "Reading error at line %d, unexpected format: '%s'", - line_number + 1, - ellipsis(line), + f"Reading error at line {line_number + 1}, unexpected format: '{self.parser_logger.ellipsis(line)}'" ) else: # in case there is space before or after the colons @@ -325,9 +312,7 @@ def _harvest_entries(self, filename: str, entries_start_line: int) -> Iterator[N correctly_written.match(property_name) and correctly_written.match(lc) ): self.parser_logger.error( - "Reading error at line %d, unexpected format: '%s'", - line_number + 1, - ellipsis(line), + f"Reading error at line {line_number + 1}, unexpected format: '{self.parser_logger.ellipsis(line)}'" ) if property_name: data.properties["prop_" + property_name + "_" + lc] = property_value @@ -339,7 +324,7 @@ def _harvest_entries(self, filename: str, entries_start_line: int) -> Iterator[N def _create_taxonomy(self, filename: str) -> Taxonomy: """Create the taxonomy from the file""" - self.parser_logger.info("Parsing taxonomy file %s", filename) + self.parser_logger.info(f"Parsing {filename}") harvested_header_data, entries_start_line = self._header_harvest(filename) entry_nodes: list[NodeData] = [] other_nodes = [ @@ -372,13 +357,12 @@ def parse_file(self, filename: str, logger: ParserConsoleLogger | None = None) - start_time = timeit.default_timer() filename = self._normalized_filename(filename) taxonomy = self._create_taxonomy(filename) - end_time = timeit.default_timer() - self.parser_logger.info("Parsing done in %s seconds", end_time - start_time) + self.parser_logger.info(f"Parsing done in {timeit.default_timer() - start_time} seconds.") self.parser_logger.info( - "Found %d nodes", len(taxonomy.entry_nodes) + len(taxonomy.other_nodes) + f"Found {len(taxonomy.entry_nodes) + len(taxonomy.other_nodes)} nodes" ) - self.parser_logger.info("Found %d previous links", len(taxonomy.previous_links)) - self.parser_logger.info("Found %d child links", len(taxonomy.child_links)) + self.parser_logger.info(f"Found {len(taxonomy.previous_links)} previous links") + self.parser_logger.info(f"Found {len(taxonomy.child_links)} child links") return taxonomy