Skip to content

Commit

Permalink
Add delete all for tenants in Vespa
Browse files Browse the repository at this point in the history
  • Loading branch information
pablonyx committed Feb 13, 2025
1 parent 12b2126 commit 27e4aaa
Showing 1 changed file with 170 additions and 8 deletions.
178 changes: 170 additions & 8 deletions backend/scripts/debugging/onyx_vespa.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,28 @@ def get_documents_for_tenant_connector(


def search_for_document(
index_name: str, document_id: str, max_hits: int | None = 10
index_name: str,
document_id: str | None = None,
tenant_id: str | None = None,
max_hits: int | None = 10,
) -> List[Dict[str, Any]]:
yql_query = (
f'select * from sources {index_name} where document_id contains "{document_id}"'
)
yql_query = f"select * from sources {index_name}"

conditions = []
if document_id is not None:
conditions.append(f'document_id contains "{document_id}"')

if tenant_id is not None:
conditions.append(f'tenant_id contains "{tenant_id}"')

if conditions:
yql_query += " where " + " and ".join(conditions)

params: dict[str, Any] = {"yql": yql_query}
if max_hits is not None:
params["hits"] = max_hits
with get_vespa_http_client() as client:
response = client.get(f"{SEARCH_ENDPOINT}/search/", params=params)
response = client.get(f"{SEARCH_ENDPOINT}search/", params=params)
response.raise_for_status()
result = response.json()
documents = result.get("root", {}).get("children", [])
Expand Down Expand Up @@ -582,8 +594,15 @@ def update_document(
) -> None:
update_document(self.tenant_id, connector_id, doc_id, fields)

def search_for_document(self, document_id: str) -> List[Dict[str, Any]]:
return search_for_document(self.index_name, document_id)
def delete_documents_for_tenant(self, count: int | None = None) -> None:
if not self.tenant_id:
raise Exception("Tenant ID is not set")
delete_documents_for_tenant(self.index_name, self.tenant_id, count=count)

def search_for_document(
self, document_id: str | None = None, tenant_id: str | None = None
) -> List[Dict[str, Any]]:
return search_for_document(self.index_name, document_id, tenant_id)

def delete_document(self, connector_id: int, doc_id: str) -> None:
# Delete a document.
Expand All @@ -600,6 +619,147 @@ def acls(self, cc_pair_id: int, n: int | None = 10) -> None:
get_document_acls(self.tenant_id, cc_pair_id, n)


def delete_where(
index_name: str,
selection: str,
cluster: str = "default",
bucket_space: str | None = None,
continuation: str | None = None,
time_chunk: str | None = None,
timeout: str | None = None,
tracelevel: int | None = None,
) -> None:
"""
Removes visited documents in `cluster` where the given selection
is true, using Vespa's 'delete where' endpoint.
:param index_name: Typically <namespace>/<document-type> from your schema
:param selection: The selection string, e.g., "true" or "foo contains 'bar'"
:param cluster: The name of the cluster where documents reside
:param bucket_space: e.g. 'global' or 'default'
:param continuation: For chunked visits
:param time_chunk: If you want to chunk the visit by time
:param timeout: e.g. '10s'
:param tracelevel: Increase for verbose logs
"""
# Using index_name of form <namespace>/<document-type>, e.g. "nomic_ai_nomic_embed_text_v1"
# This route ends with "/docid/" since the actual ID is not specified — we rely on "selection".
path = f"/document/v1/{index_name}/docid/"

params = {
"cluster": cluster,
"selection": selection,
}

# Optional parameters
if bucket_space is not None:
params["bucketSpace"] = bucket_space
if continuation is not None:
params["continuation"] = continuation
if time_chunk is not None:
params["timeChunk"] = time_chunk
if timeout is not None:
params["timeout"] = timeout
if tracelevel is not None:
params["tracelevel"] = tracelevel # type: ignore

with get_vespa_http_client() as client:
url = f"{VESPA_APPLICATION_ENDPOINT}{path}"
logger.info(f"Performing 'delete where' on {url} with selection={selection}...")
response = client.delete(url, params=params)
# (Optionally, you can keep fetching `continuation` from the JSON response
# if you have more documents to delete in chunks.)
response.raise_for_status() # will raise HTTPError if not 2xx
logger.info(f"Delete where completed with status: {response.status_code}")
print(f"Delete where completed with status: {response.status_code}")


def delete_documents_for_tenant(
index_name: str,
tenant_id: str,
route: str | None = None,
condition: str | None = None,
timeout: str | None = None,
tracelevel: int | None = None,
count: int | None = None,
) -> None:
"""
For the given tenant_id and index_name (often in the form <namespace>/<document-type>),
find documents via search_for_document, then delete them one at a time using Vespa's
/document/v1/<namespace>/<document-type>/docid/<document-id> endpoint.
:param index_name: Typically <namespace>/<document-type> from your schema
:param tenant_id: The tenant to match in your Vespa search
:param route: Optional route parameter for delete
:param condition: Optional conditional remove
:param timeout: e.g. '10s'
:param tracelevel: Increase for verbose logs
"""
deleted_count = 0
while True:
# Search for documents with the given tenant_id
docs = search_for_document(
index_name=index_name,
document_id=None,
tenant_id=tenant_id,
max_hits=100, # Fetch in batches of 100
)

if not docs:
logger.info("No more documents found to delete.")
break

with get_vespa_http_client() as client:
for doc in docs:
if count is not None and deleted_count >= count:
logger.info(f"Reached maximum delete limit of {count} documents.")
return

fields = doc.get("fields", {})
doc_id_value = fields.get("document_id") or fields.get("documentid")
tenant_id = fields.get("tenant_id")
if tenant_id != tenant_id:
raise Exception("Tenant ID mismatch")

if not doc_id_value:
logger.warning(
"Skipping a document that has no document_id in 'fields'."
)
continue

url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_id_value}"

params = {}
if condition:
params["condition"] = condition
if route:
params["route"] = route
if timeout:
params["timeout"] = timeout
if tracelevel is not None:
params["tracelevel"] = str(tracelevel)

response = client.delete(url, params=params)
if response.status_code == 200:
logger.info(f"Successfully deleted doc_id={doc_id_value}")
deleted_count += 1
else:
logger.error(
f"Failed to delete doc_id={doc_id_value}, "
f"status={response.status_code}, response={response.text}"
)
print(
f"Could not delete doc_id={doc_id_value}. "
f"Status={response.status_code}, response={response.text}"
)
raise Exception(
f"Could not delete doc_id={doc_id_value}. "
f"Status={response.status_code}, response={response.text}"
)

logger.info(f"Deleted {deleted_count} documents in total.")


def main() -> None:
parser = argparse.ArgumentParser(description="Vespa debugging tool")
parser.add_argument(
Expand Down Expand Up @@ -630,7 +790,9 @@ def main() -> None:
args = parser.parse_args()
vespa_debug = VespaDebugging(args.tenant_id)

if args.action == "config":
if args.action == "delete-all-documents":
vespa_debug.delete_documents_for_tenant(args.count)
elif args.action == "config":
vespa_debug.print_config()
elif args.action == "connect":
vespa_debug.check_connectivity()
Expand Down

0 comments on commit 27e4aaa

Please sign in to comment.