From d7e3c574bcee3d8ae5429e43a67ad1bb91a8dfaa Mon Sep 17 00:00:00 2001 From: Andrey Date: Fri, 31 May 2024 21:52:35 +0300 Subject: [PATCH 1/4] Add materialized view. --- engineapi/engineapi/actions.py | 314 ++++++++++++++++++--------------- 1 file changed, 173 insertions(+), 141 deletions(-) diff --git a/engineapi/engineapi/actions.py b/engineapi/engineapi/actions.py index bc0c6d74..312db96b 100644 --- a/engineapi/engineapi/actions.py +++ b/engineapi/engineapi/actions.py @@ -16,8 +16,9 @@ from hexbytes import HexBytes import requests # type: ignore from sqlalchemy.dialects.postgresql import insert -from sqlalchemy.orm import Session -from sqlalchemy import func, text, or_, and_, Subquery +from sqlalchemy.orm import Session, Query +from sqlalchemy import func, text, or_, and_, Subquery, literal_column +from sqlalchemy.sql import exists, select, column, table from sqlalchemy.engine import Row from web3 import Web3 from web3.types import ChecksumAddress @@ -1000,6 +1001,152 @@ def leaderboard_version_filter( return latest_version +def mv_pg_name(leaderboard_id: uuid.UUID) -> str: + return f"mv_leaderboard_{leaderboard_id}".replace("-", "_") + + +def mv_check(db_session: Session, leaderboard_id: uuid.UUID) -> bool: + mv_name = mv_pg_name(leaderboard_id) + exists_query = text( + f""" + SELECT EXISTS ( + SELECT FROM pg_matviews WHERE schemaname = 'public' AND matviewname = '{mv_name}' + ); + """ + ) + result = db_session.execute(exists_query).scalar().bool() + return result + + +def create_materialized_view(db_session, leaderboard_id): + # Safely format the view name using the UUID converted to string + mv_name = mv_pg_name(leaderboard_id) + + sql_command = text( + f""" + CREATE MATERIALIZED VIEW IF NOT EXISTS {mv_name} AS + SELECT + leaderboard_scores.address AS address, + leaderboard_scores.score AS score, + leaderboard_scores.points_data AS points_data, + rank() OVER (ORDER BY leaderboard_scores.score DESC, address) AS rank + FROM + leaderboard_scores + JOIN leaderboard_versions ON leaderboard_versions.leaderboard_id = leaderboard_scores.leaderboard_id + AND leaderboard_versions.version_number = leaderboard_scores.leaderboard_version_number + WHERE + leaderboard_scores.leaderboard_id = :leaderboard_id + AND leaderboard_versions.published = true + AND leaderboard_versions.version_number = ( + SELECT + max(leaderboard_versions.version_number) + FROM + leaderboard_versions + WHERE + leaderboard_versions.leaderboard_id = :leaderboard_id + AND leaderboard_versions.published = true + ) + ORDER BY leaderboard_scores.score DESC, address + """ + ) + + # Execute the command with parameters + db_session.execute(sql_command, {"leaderboard_id": str(leaderboard_id)}) + db_session.commit() + + +def update_materialized_view(db_session: Session, leaderboard_id: uuid.UUID): + + mv_name = mv_pg_name(leaderboard_id) + + if not mv_check(db_session, leaderboard_id): + try: + create_materialized_view(db_session, leaderboard_id) + db_session.commit() + except Exception as e: + db_session.rollback() + logger.error(f"Error creating materialized view: {e}") + raise # Re-raise exception after logging + else: + db_session.execute(text(f"REFRESH MATERIALIZED VIEW CONCURRENTLY {mv_name}")) + + +def get_leaderboard_materialized_view(db_session: Session, leaderboard_id: uuid.UUID): + ### materialized view name + mv_name = mv_pg_name(leaderboard_id) + + if not mv_check(db_session, leaderboard_id): + try: + create_materialized_view(db_session, leaderboard_id) + db_session.commit() + except Exception as e: + db_session.rollback() + logger.error(f"Error creating materialized view: {e}") + raise # Re-raise exception after logging + + ### construct the query + # Directly query the materialized view using text + # Define the materialized view as a table object + leaderboard_table = table( + mv_name, + column("address"), + column("score"), + column("points_data"), + column("rank"), + ) + + # Construct the select statement + query = db_session.query(leaderboard_table) + return query + + +def generate_ranking_query( + db_session: Session, + leaderboard_id: uuid.UUID, + version_number: Optional[int] = None, +) -> Query: + """ + Generate a query to get the ranking of the leaderboard + """ + + if version_number is None: + + query = get_leaderboard_materialized_view(db_session, leaderboard_id) + + else: + + latest_version = leaderboard_version_filter( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version_number, + ) + + query = ( + db_session.query( + LeaderboardScores.address, + LeaderboardScores.score, + LeaderboardScores.points_data, + func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), + ) + .join( + LeaderboardVersion, + and_( + LeaderboardVersion.leaderboard_id + == LeaderboardScores.leaderboard_id, + LeaderboardVersion.version_number + == LeaderboardScores.leaderboard_version_number, + ), + ) + .filter(LeaderboardScores.leaderboard_id == leaderboard_id) + .filter( + LeaderboardVersion.published == True, + LeaderboardVersion.version_number == latest_version, + ) + ) + + return query + + def get_leaderboard_total_count( db_session: Session, leaderboard_id, version_number: Optional[int] = None ) -> int: @@ -1178,36 +1325,7 @@ def get_position( Return position by address with window size """ - latest_version = leaderboard_version_filter( - db_session=db_session, - leaderboard_id=leaderboard_id, - version_number=version_number, - ) - - query = ( - db_session.query( - LeaderboardScores.address, - LeaderboardScores.score, - LeaderboardScores.points_data.label("points_data"), - func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), - func.row_number() - .over(order_by=LeaderboardScores.score.desc()) - .label("number"), - ) - .join( - LeaderboardVersion, - and_( - LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id, - LeaderboardVersion.version_number - == LeaderboardScores.leaderboard_version_number, - ), - ) - .filter( - LeaderboardVersion.published == True, - LeaderboardVersion.version_number == latest_version, - ) - .filter(LeaderboardScores.leaderboard_id == leaderboard_id) - ) + query = generate_ranking_query(db_session, leaderboard_id, version_number) ranked_leaderboard = query.cte(name="ranked_leaderboard") @@ -1297,33 +1415,7 @@ def get_leaderboard_positions( # get public leaderboard scores with max version - latest_version = leaderboard_version_filter( - db_session=db_session, - leaderboard_id=leaderboard_id, - version_number=version_number, - ) - - # Main query - query = ( - db_session.query( - LeaderboardScores.id, - LeaderboardScores.address, - LeaderboardScores.score, - LeaderboardScores.points_data, - func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), - ) - .join( - LeaderboardVersion, - and_( - LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id, - LeaderboardVersion.version_number - == LeaderboardScores.leaderboard_version_number, - ), - ) - .filter(LeaderboardScores.leaderboard_id == leaderboard_id) - .filter(LeaderboardVersion.published == True) - .filter(LeaderboardVersion.version_number == latest_version) - ) + query = generate_ranking_query(db_session, leaderboard_id, version_number) if len(poitns_data) > 0: @@ -1353,32 +1445,7 @@ def get_qurtiles( https://docs.sqlalchemy.org/en/14/core/functions.html#sqlalchemy.sql.functions.percentile_disc """ - latest_version = leaderboard_version_filter( - db_session=db_session, - leaderboard_id=leaderboard_id, - version_number=version_number, - ) - - query = ( - db_session.query( - LeaderboardScores.address, - LeaderboardScores.score, - func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), - ) - .join( - LeaderboardVersion, - and_( - LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id, - LeaderboardVersion.version_number - == LeaderboardScores.leaderboard_version_number, - ), - ) - .filter( - LeaderboardVersion.published == True, - LeaderboardVersion.version_number == latest_version, - ) - .filter(LeaderboardScores.leaderboard_id == leaderboard_id) - ) + query = generate_ranking_query(db_session, leaderboard_id, version_number) ranked_leaderboard = query.cte(name="ranked_leaderboard") @@ -1409,34 +1476,7 @@ def get_ranks( Get the leaderboard rank buckets(rank, size, score) """ - latest_version = leaderboard_version_filter( - db_session=db_session, - leaderboard_id=leaderboard_id, - version_number=version_number, - ) - - query = ( - db_session.query( - LeaderboardScores.id, - LeaderboardScores.address, - LeaderboardScores.score, - LeaderboardScores.points_data, - func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), - ) - .join( - LeaderboardVersion, - and_( - LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id, - LeaderboardVersion.version_number - == LeaderboardScores.leaderboard_version_number, - ), - ) - .filter( - LeaderboardVersion.published == True, - LeaderboardVersion.version_number == latest_version, - ) - .filter(LeaderboardScores.leaderboard_id == leaderboard_id) - ) + query = generate_ranking_query(db_session, leaderboard_id, version_number) ranked_leaderboard = query.cte(name="ranked_leaderboard") @@ -1460,35 +1500,9 @@ def get_rank( Get bucket in leaderboard by rank """ - latest_version = leaderboard_version_filter( - db_session=db_session, - leaderboard_id=leaderboard_id, - version_number=version_number, - ) + query = generate_ranking_query(db_session, leaderboard_id, version_number) - query = ( - db_session.query( - LeaderboardScores.id, - LeaderboardScores.address, - LeaderboardScores.score, - LeaderboardScores.points_data, - func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), - ) - .join( - LeaderboardVersion, - and_( - LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id, - LeaderboardVersion.version_number - == LeaderboardScores.leaderboard_version_number, - ), - ) - .filter( - LeaderboardVersion.published == True, - LeaderboardVersion.version_number == latest_version, - ) - .filter(LeaderboardScores.leaderboard_id == leaderboard_id) - .order_by(text("rank asc, id asc")) - ) + query.order_by(text("rank asc, id asc")) ranked_leaderboard = query.cte(name="ranked_leaderboard") @@ -1546,6 +1560,12 @@ def create_leaderboard( user_id=str(user.id) if user is not None else None, ) leaderboard.resource_id = resource.id + try: + create_materialized_view(db_session, leaderboard.id) + except Exception as e: + logger.error(f"Error creating materialized view: {e}") + raise LeaderboardCreateError(f"Error creating materialized view: {e}") + db_session.commit() except Exception as e: db_session.rollback() @@ -1712,12 +1732,18 @@ def add_scores( updated_at=datetime.now(), ), ) + try: db_session.execute(result_stmt) db_session.commit() except: db_session.rollback() + try: + update_materialized_view(db_session, leaderboard_id) + except Exception as e: + logger.error(f"Error updating materialized view: {e}") + return leaderboard_scores @@ -2031,6 +2057,9 @@ def create_leaderboard_version( db_session.add(leaderboard_version) db_session.commit() + if publish: + update_materialized_view(db_session, leaderboard_id) + return leaderboard_version @@ -2051,6 +2080,9 @@ def change_publish_leaderboard_version_status( db_session.commit() + if published: + update_materialized_view(db_session, leaderboard_id) + return leaderboard_version From 656b180f13b523dc0c33b4dc904c3fbb652438ff Mon Sep 17 00:00:00 2001 From: Andrey Date: Fri, 31 May 2024 21:56:26 +0300 Subject: [PATCH 2/4] Add missing endpoint. --- engineapi/engineapi/actions.py | 47 +++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/engineapi/engineapi/actions.py b/engineapi/engineapi/actions.py index 312db96b..3e9565d5 100644 --- a/engineapi/engineapi/actions.py +++ b/engineapi/engineapi/actions.py @@ -1154,28 +1154,35 @@ def get_leaderboard_total_count( Get the total number of position in the leaderboard """ - latest_version = leaderboard_version_filter( - db_session=db_session, - leaderboard_id=leaderboard_id, - version_number=version_number, - ) + if version_number is None: - total_count = ( - db_session.query(func.count(LeaderboardScores.id)) - .join( - LeaderboardVersion, - and_( - LeaderboardVersion.leaderboard_id == LeaderboardScores.leaderboard_id, - LeaderboardVersion.version_number - == LeaderboardScores.leaderboard_version_number, - ), - ) - .filter( - LeaderboardVersion.published == True, - LeaderboardVersion.version_number == latest_version, + query = get_leaderboard_materialized_view(db_session, leaderboard_id) + + total_count = query.count() + else: + latest_version = leaderboard_version_filter( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version_number, ) - .filter(LeaderboardScores.leaderboard_id == leaderboard_id) - ).scalar() + + total_count = ( + db_session.query(func.count(LeaderboardScores.id)) + .join( + LeaderboardVersion, + and_( + LeaderboardVersion.leaderboard_id + == LeaderboardScores.leaderboard_id, + LeaderboardVersion.version_number + == LeaderboardScores.leaderboard_version_number, + ), + ) + .filter( + LeaderboardVersion.published == True, + LeaderboardVersion.version_number == latest_version, + ) + .filter(LeaderboardScores.leaderboard_id == leaderboard_id) + ).scalar() return total_count From c36965f28085910e7b67d557b97b03ffa799205d Mon Sep 17 00:00:00 2001 From: Andrey Date: Fri, 31 May 2024 22:29:29 +0300 Subject: [PATCH 3/4] Add fix for position. --- engineapi/engineapi/actions.py | 67 ++++++++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 7 deletions(-) diff --git a/engineapi/engineapi/actions.py b/engineapi/engineapi/actions.py index 3e9565d5..9707d50d 100644 --- a/engineapi/engineapi/actions.py +++ b/engineapi/engineapi/actions.py @@ -1014,7 +1014,10 @@ def mv_check(db_session: Session, leaderboard_id: uuid.UUID) -> bool: ); """ ) - result = db_session.execute(exists_query).scalar().bool() + result = db_session.execute(exists_query).scalar() + + if result is None: + return False return result @@ -1029,7 +1032,8 @@ def create_materialized_view(db_session, leaderboard_id): leaderboard_scores.address AS address, leaderboard_scores.score AS score, leaderboard_scores.points_data AS points_data, - rank() OVER (ORDER BY leaderboard_scores.score DESC, address) AS rank + rank() OVER (ORDER BY leaderboard_scores.score DESC, address) AS rank, + row_number() OVER (ORDER BY leaderboard_scores.score DESC, address) AS number FROM leaderboard_scores JOIN leaderboard_versions ON leaderboard_versions.leaderboard_id = leaderboard_scores.leaderboard_id @@ -1071,7 +1075,9 @@ def update_materialized_view(db_session: Session, leaderboard_id: uuid.UUID): db_session.execute(text(f"REFRESH MATERIALIZED VIEW CONCURRENTLY {mv_name}")) -def get_leaderboard_materialized_view(db_session: Session, leaderboard_id: uuid.UUID): +def get_leaderboard_materialized_view( + db_session: Session, leaderboard_id: uuid.UUID, with_numerator: bool = False +) -> Query: ### materialized view name mv_name = mv_pg_name(leaderboard_id) @@ -1087,12 +1093,20 @@ def get_leaderboard_materialized_view(db_session: Session, leaderboard_id: uuid. ### construct the query # Directly query the materialized view using text # Define the materialized view as a table object - leaderboard_table = table( - mv_name, + + columns = [ column("address"), column("score"), column("points_data"), column("rank"), + ] + + if with_numerator: + columns.append(column("number")) + + leaderboard_table = table( + mv_name, + *columns, ) # Construct the select statement @@ -1104,6 +1118,7 @@ def generate_ranking_query( db_session: Session, leaderboard_id: uuid.UUID, version_number: Optional[int] = None, + with_numerator: bool = False, ) -> Query: """ Generate a query to get the ranking of the leaderboard @@ -1111,7 +1126,9 @@ def generate_ranking_query( if version_number is None: - query = get_leaderboard_materialized_view(db_session, leaderboard_id) + query = get_leaderboard_materialized_view( + db_session, leaderboard_id, with_numerator + ) else: @@ -1332,7 +1349,43 @@ def get_position( Return position by address with window size """ - query = generate_ranking_query(db_session, leaderboard_id, version_number) + if version_number is None: + + query = get_leaderboard_materialized_view(db_session, leaderboard_id, True) + + else: + + latest_version = leaderboard_version_filter( + db_session=db_session, + leaderboard_id=leaderboard_id, + version_number=version_number, + ) + + query = ( + db_session.query( + LeaderboardScores.address, + LeaderboardScores.score, + LeaderboardScores.points_data.label("points_data"), + func.rank().over(order_by=LeaderboardScores.score.desc()).label("rank"), + func.row_number() + .over(order_by=LeaderboardScores.score.desc()) + .label("number"), + ) + .join( + LeaderboardVersion, + and_( + LeaderboardVersion.leaderboard_id + == LeaderboardScores.leaderboard_id, + LeaderboardVersion.version_number + == LeaderboardScores.leaderboard_version_number, + ), + ) + .filter( + LeaderboardVersion.published == True, + LeaderboardVersion.version_number == latest_version, + ) + .filter(LeaderboardScores.leaderboard_id == leaderboard_id) + ) ranked_leaderboard = query.cte(name="ranked_leaderboard") From 4e1fda513b855b6354b4141bb3f61f4d9cd903ae Mon Sep 17 00:00:00 2001 From: Andrey Date: Fri, 31 May 2024 23:15:28 +0300 Subject: [PATCH 4/4] Add missing commit. --- engineapi/engineapi/actions.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/engineapi/engineapi/actions.py b/engineapi/engineapi/actions.py index 9707d50d..706b79ab 100644 --- a/engineapi/engineapi/actions.py +++ b/engineapi/engineapi/actions.py @@ -1072,7 +1072,8 @@ def update_materialized_view(db_session: Session, leaderboard_id: uuid.UUID): logger.error(f"Error creating materialized view: {e}") raise # Re-raise exception after logging else: - db_session.execute(text(f"REFRESH MATERIALIZED VIEW CONCURRENTLY {mv_name}")) + db_session.execute(text(f"REFRESH MATERIALIZED VIEW {mv_name}")) + db_session.commit() def get_leaderboard_materialized_view( @@ -2118,7 +2119,10 @@ def create_leaderboard_version( db_session.commit() if publish: - update_materialized_view(db_session, leaderboard_id) + try: + update_materialized_view(db_session, leaderboard_id) + except Exception as e: + logger.error(f"Error updating materialized view: {e}") return leaderboard_version @@ -2141,7 +2145,10 @@ def change_publish_leaderboard_version_status( db_session.commit() if published: - update_materialized_view(db_session, leaderboard_id) + try: + update_materialized_view(db_session, leaderboard_id) + except Exception as e: + logger.error(f"Error updating materialized view: {e}") return leaderboard_version @@ -2175,6 +2182,11 @@ def delete_leaderboard_version( db_session.delete(leaderboard_version) db_session.commit() + try: + update_materialized_view(db_session, leaderboard_id) + except Exception as e: + logger.error(f"Error updating materialized view: {e}") + return leaderboard_version