Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration recipes for mappings and reindexing #696

Open
slint opened this issue Sep 5, 2024 · 0 comments
Open

Migration recipes for mappings and reindexing #696

slint opened this issue Sep 5, 2024 · 0 comments

Comments

@slint
Copy link
Member

slint commented Sep 5, 2024

Cold migration (with downtime)

Deploy the code and run the following:

invenio index destroy --yes-i-know
invenio index init
invenio rdm rebuild-all-indices

invenio-jobs changes:

invenio alembic upgrade

Live migration (no downtime)

Deploy the code in a separate environment and using the following script:

import json
import time
from datetime import datetime, timedelta
from textwrap import dedent

import humanize
from flask import current_app
from invenio_access.permissions import system_identity
from invenio_oaiserver.percolator import (
    PERCOLATOR_MAPPING,
    _build_percolator_index_name,
)
from invenio_rdm_records.proxies import current_rdm_records
from invenio_search.proxies import current_search, current_search_client
from invenio_search.utils import build_alias_name


def get_index_info(index):
    write_alias = build_alias_name(index)
    indices = current_search_client.indices.get_alias(name=write_alias, ignore=[404])
    if indices.get("status") == 404:
        return None, None, None
    assert len(indices) == 1
    index_name = list(indices.keys())[0]
    aliases_resp = current_search_client.indices.get_alias(index=index_name)
    read_aliases = [
        a for a in aliases_resp[index_name]["aliases"].keys() if a != write_alias
    ]
    return index_name, write_alias, read_aliases


def reindex(old_index_name, new_index_name):
    # Set replicas to 0
    print(f"Setting replicas to 0 for {new_index_name}")
    current_search_client.indices.put_settings(
        index=new_index_name,
        body={"index": {"number_of_replicas": 0}},
    )

    # Reindex all records (this will return a Task ID)
    print(f"Reindexing {old_index_name} to {new_index_name}")
    task = current_search_client.reindex(
        body={
            "source": {"index": old_index_name},
            "dest": {
                "index": new_index_name,
                "version_type": "external_gte",
            },
        },
        wait_for_completion=False,
    )
    print(
        f"Task ID for reindexing {old_index_name} to {new_index_name}: {task['task']}"
    )
    return task


def reindex_delta(old_index_name, new_index_name, since):
    total_docs = current_search_client.count(
        index=old_index_name,
        body={"query": {"range": {"updated": {"gte": since}}}},
    )["count"]
    print(
        f"Reindexing {old_index_name} to {new_index_name} since {since} ({total_docs} docs)"
    )
    task = current_search_client.reindex(
        body={
            "source": {
                "index": old_index_name,
                "query": {"range": {"updated": {"gte": since}}},
            },
            "dest": {
                "index": new_index_name,
                "version_type": "external_gte",
            },
        },
        wait_for_completion=False,
    )
    print(
        f"Task ID for reindexing {old_index_name} to {new_index_name}: {task['task']}"
    )
    return task


def get_last_updated_ts(index_name):
    res = current_search_client.search(
        index=index_name,
        body={"size": 0, "aggs": {"last_updated": {"max": {"field": "updated"}}}},
    )
    return res["aggregations"]["last_updated"]["value_as_string"]


def check_progress(task_id):
    progress = current_search_client.tasks.get(task_id=task_id)

    if not progress["completed"]:
        total = progress["task"]["status"]["total"]
        created = progress["task"]["status"]["created"]
        if total == 0 or created == 0:
            print("Reindexing in progress: no records reindexed yet.")
            return False
        percentage = round((created / total) * 100, 2)
        eta_seconds = (
            progress["task"]["running_time_in_nanos"]
            / created
            * (total - created)
            / 1_000_000_000
        )
        eta = datetime.now() + timedelta(seconds=eta_seconds)
        print(
            dedent(f"""\
        Reindexing in progress: {created}/{total} ({percentage}%) records reindexed.
        ETA: {humanize.naturaldelta(eta_seconds)} ({eta.isoformat()})
        """)
        )
        return False

    # Refresh the index
    index_name = progress["task"]["description"].split(" to ")[1][1:-1]
    total_time = progress["task"]["running_time_in_nanos"] / 1_000_000_000
    print(f"Reindexing completed in {humanize.naturaldelta(total_time)}")
    print(f"Refreshing {index_name}...")
    current_search_client.indices.refresh(index=index_name)
    print(f"Refreshed {index_name}")

    # Set replicas to 2
    print(f"Updating replicas for {index_name}")
    current_search_client.indices.put_settings(
        index=index_name,
        body={"index": {"number_of_replicas": 2}},
    )
    return True


def rollover_index(old_index, new_index):
    old_index_name, old_index_alias, read_aliases = get_index_info(old_index)
    new_index_name, new_index_alias, _ = get_index_info(new_index)

    # Update aliases
    alias_ops = []
    alias_ops.append({"remove": {"index": old_index_name, "alias": new_index_alias}})
    alias_ops.append({"add": {"index": new_index_name, "alias": new_index_alias}})
    for alias in read_aliases:
        # Skip aliases that are not part of the new index name
        if alias not in new_index_name:
            continue

        alias_ops.append({"remove": {"index": old_index_name, "alias": alias}})
        alias_ops.append({"add": {"index": new_index_name, "alias": alias}})
    current_search_client.indices.update_aliases(body={"actions": alias_ops})


def delete_old_index(old_index):
    old_index_name, _, _ = get_index_info(old_index)

    # Delete old index
    current_search_client.indices.delete(index=old_index_name)


def run_pre_deploy(old_index, new_index, custom_fields_cfg=None):
    old_index_name, old_index_alias, read_aliases = get_index_info(old_index)
    new_index_name, new_index_alias, _ = get_index_info(new_index)
    assert new_index_name is None, f"New index {new_index_name} already exists."

    # Create the new index
    (new_index_name, _), (new_index_alias, _) = current_search.create_index(
        index=new_index, create_write_alias=True
    )
    
    # Handle custom fields
    if custom_fields_cfg:
        custom_fields = current_app.config[custom_fields_cfg]
        properties = Mapping.properties_for_fields(None, custom_fields)
        current_search_client.indices.put_mapping(
            index=new_index_name,
            body={"properties": properties},
        )

    # Reindex all records
    task = reindex(old_index_name, new_index_name)
    while not check_progress(task["task"]):
        print("Waiting 10sec for reindexing to complete...")
        time.sleep(10)


def run_sync(old_index, new_index):
    old_index_name, old_index_alias, read_aliases = get_index_info(old_index)
    new_index_name, new_index_alias, _ = get_index_info(new_index)

    # Reindex all records since last update
    since = get_last_updated_ts(new_index_name)
    while True:
        task = reindex_delta(old_index_name, new_index_name, since)
        while not check_progress(task["task"]):
            print("Waiting 10sec for reindexing to complete...")
            time.sleep(10)

        # Refresh
        current_search_client.indices.refresh(index=new_index_name)

        # Check if there are newer documents
        new_index_latest = get_last_updated_ts(new_index_name)
        old_index_latest = get_last_updated_ts(old_index_name)
        if new_index_latest >= old_index_latest:
            print("No new documents to sync.")
            break

        print(f"More documents to sync: {new_index_latest} > {old_index_latest}")

        # Reindex since we started the current reindexing task
        since = new_index_latest
        # Give an opportunity to interrupt the sync
        print("Press Ctrl+C to stop the sync...")
        time.sleep(10)


def run_post_deploy(old_index, new_index):
    # Rollover the index
    rollover_index(old_index, new_index)


def update_records_percolator(index=None):
    index = index or current_app.config["OAISERVER_RECORD_INDEX"]
    percolator_index = _build_percolator_index_name(index)
    mapping_path = current_search.mappings[index]
    with open(mapping_path, "r") as body:
        mapping = json.load(body)
        mapping["mappings"]["properties"].update(PERCOLATOR_MAPPING["properties"])
        current_search_client.indices.create(index=percolator_index, body=mapping)
    # reindex all percolator queries from OAISets
    oaipmh_service = current_rdm_records.oaipmh_server_service
    oaipmh_service.rebuild_index(identity=system_identity)

Run the following commands:

#
# Affiliations
#
OLD_AFFILIATIONS_INDEX = "affiliations-affiliation-v1.0.0"
NEW_AFFILIATIONS_INDEX = "affiliations-affiliation-v2.0.0"

run_pre_deploy(OLD_AFFILIATIONS_INDEX, NEW_AFFILIATIONS_INDEX)

# Once code is deployed, rollover the index
run_post_deploy(OLD_AFFILIATIONS_INDEX, NEW_AFFILIATIONS_INDEX)


#
# Funders
#
OLD_FUNDERS_INDEX = "funders-funder-v1.0.0"
NEW_FUNDERS_INDEX = "funders-funder-v2.0.0"

run_pre_deploy(OLD_FUNDERS_INDEX, NEW_FUNDERS_INDEX)

# Once code is deployed, rollover the index
run_post_deploy(OLD_FUNDERS_INDEX, NEW_FUNDERS_INDEX)


#
# Names
#
OLD_NAMES_INDEX = "names-name-v1.0.0"
NEW_NAMES_INDEX = "names-name-v2.0.0"

run_pre_deploy(OLD_NAMES_INDEX, NEW_NAMES_INDEX)

# Once code is deployed, rollover the index
run_post_deploy(OLD_NAMES_INDEX, NEW_NAMES_INDEX)


#
# Communities
#
OLD_COMMUNITIES_INDEX = "communities-communities-v1.0.0"
NEW_COMMUNITIES_INDEX = "communities-communities-v2.0.0"

run_pre_deploy(
    OLD_COMMUNITIES_INDEX, NEW_COMMUNITIES_INDEX,
    custom_fields_cfg="COMMUNITIES_CUSTOM_FIELDS",
)

# Sync new and updated documents
run_sync(OLD_COMMUNITIES_INDEX, NEW_COMMUNITIES_INDEX)

# Once code is deployed, rollover the index
run_post_deploy(OLD_COMMUNITIES_INDEX, NEW_COMMUNITIES_INDEX)
# Run a last sync to make sure all documents are up-to-date
run_sync(OLD_COMMUNITIES_INDEX, NEW_COMMUNITIES_INDEX)

#
# Users
#
OLD_USERS_INDEX = "users-user-v2.0.0"
NEW_USERS_INDEX = "users-user-v3.0.0"

run_pre_deploy(OLD_USERS_INDEX, NEW_USERS_INDEX)

# Sync new and updated documents
run_sync(OLD_USERS_INDEX, NEW_USERS_INDEX)

# Once code is deployed, rollover the index
run_post_deploy(OLD_USERS_INDEX, NEW_USERS_INDEX)
# Run a last sync to make sure all documents are up-to-date
run_sync(OLD_USERS_INDEX, NEW_USERS_INDEX)

#
# Records
#
OLD_RECORDS_INDEX = "rdmrecords-records-record-v6.0.0"
NEW_RECORDS_INDEX = "rdmrecords-records-record-v7.0.0"

run_pre_deploy(OLD_RECORDS_INDEX, NEW_RECORDS_INDEX, custom_fields_cfg="RDM_CUSTOM_FIELDS")
update_records_percolator(index=NEW_RECORDS_INDEX)

# Sync new and updated documents
run_sync(OLD_RECORDS_INDEX, NEW_RECORDS_INDEX)

# Once code is deployed, rollover the index
run_post_deploy(OLD_RECORDS_INDEX, NEW_RECORDS_INDEX)
# Run a last sync to make sure all documents are up-to-date
run_sync(OLD_RECORDS_INDEX, NEW_RECORDS_INDEX)


#
# Update record view stats events template to add `is_machine`
#
from invenio_search.proxies import current_search
# Will update all templates (including the record view events)
list(current_search.put_templates())

# You'll also need to update the latest record views events index manually:
"""
PUT /events-stats-record-view-2024-09/_mapping
{
  "properties": {
    "is_machine": {
          "type": "boolean"
    }
  }
}
"""
@slint slint converted this from a draft issue Sep 5, 2024
@slint slint changed the title Migration recipes for mapping and reindexing Migration recipes for mappings and reindexing Sep 5, 2024
@slint slint removed their assignment Sep 19, 2024
@kpsherva kpsherva moved this from To release 🤖 to Ready in Sprint Q1/2025 Oct 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Ready
Development

No branches or pull requests

2 participants