diff --git a/backend/scripts/debugging/onyx_vespa.py b/backend/scripts/debugging/onyx_vespa.py index 39c1b8b0d93..6073c61d8c9 100644 --- a/backend/scripts/debugging/onyx_vespa.py +++ b/backend/scripts/debugging/onyx_vespa.py @@ -256,16 +256,28 @@ def get_documents_for_tenant_connector( def search_for_document( - index_name: str, document_id: str, max_hits: int | None = 10 + index_name: str, + document_id: str | None = None, + tenant_id: str | None = None, + max_hits: int | None = 10, ) -> List[Dict[str, Any]]: - yql_query = ( - f'select * from sources {index_name} where document_id contains "{document_id}"' - ) + yql_query = f"select * from sources {index_name}" + + conditions = [] + if document_id is not None: + conditions.append(f'document_id contains "{document_id}"') + + if tenant_id is not None: + conditions.append(f'tenant_id contains "{tenant_id}"') + + if conditions: + yql_query += " where " + " and ".join(conditions) + params: dict[str, Any] = {"yql": yql_query} if max_hits is not None: params["hits"] = max_hits with get_vespa_http_client() as client: - response = client.get(f"{SEARCH_ENDPOINT}/search/", params=params) + response = client.get(f"{SEARCH_ENDPOINT}search/", params=params) response.raise_for_status() result = response.json() documents = result.get("root", {}).get("children", []) @@ -582,8 +594,15 @@ def update_document( ) -> None: update_document(self.tenant_id, connector_id, doc_id, fields) - def search_for_document(self, document_id: str) -> List[Dict[str, Any]]: - return search_for_document(self.index_name, document_id) + def delete_documents_for_tenant(self, count: int | None = None) -> None: + if not self.tenant_id: + raise Exception("Tenant ID is not set") + delete_documents_for_tenant(self.index_name, self.tenant_id, count=count) + + def search_for_document( + self, document_id: str | None = None, tenant_id: str | None = None + ) -> List[Dict[str, Any]]: + return search_for_document(self.index_name, document_id, tenant_id) def delete_document(self, connector_id: int, doc_id: str) -> None: # Delete a document. @@ -600,6 +619,147 @@ def acls(self, cc_pair_id: int, n: int | None = 10) -> None: get_document_acls(self.tenant_id, cc_pair_id, n) +def delete_where( + index_name: str, + selection: str, + cluster: str = "default", + bucket_space: str | None = None, + continuation: str | None = None, + time_chunk: str | None = None, + timeout: str | None = None, + tracelevel: int | None = None, +) -> None: + """ + Removes visited documents in `cluster` where the given selection + is true, using Vespa's 'delete where' endpoint. + + :param index_name: Typically / from your schema + :param selection: The selection string, e.g., "true" or "foo contains 'bar'" + :param cluster: The name of the cluster where documents reside + :param bucket_space: e.g. 'global' or 'default' + :param continuation: For chunked visits + :param time_chunk: If you want to chunk the visit by time + :param timeout: e.g. '10s' + :param tracelevel: Increase for verbose logs + """ + # Using index_name of form /, e.g. "nomic_ai_nomic_embed_text_v1" + # This route ends with "/docid/" since the actual ID is not specified — we rely on "selection". + path = f"/document/v1/{index_name}/docid/" + + params = { + "cluster": cluster, + "selection": selection, + } + + # Optional parameters + if bucket_space is not None: + params["bucketSpace"] = bucket_space + if continuation is not None: + params["continuation"] = continuation + if time_chunk is not None: + params["timeChunk"] = time_chunk + if timeout is not None: + params["timeout"] = timeout + if tracelevel is not None: + params["tracelevel"] = tracelevel # type: ignore + + with get_vespa_http_client() as client: + url = f"{VESPA_APPLICATION_ENDPOINT}{path}" + logger.info(f"Performing 'delete where' on {url} with selection={selection}...") + response = client.delete(url, params=params) + # (Optionally, you can keep fetching `continuation` from the JSON response + # if you have more documents to delete in chunks.) + response.raise_for_status() # will raise HTTPError if not 2xx + logger.info(f"Delete where completed with status: {response.status_code}") + print(f"Delete where completed with status: {response.status_code}") + + +def delete_documents_for_tenant( + index_name: str, + tenant_id: str, + route: str | None = None, + condition: str | None = None, + timeout: str | None = None, + tracelevel: int | None = None, + count: int | None = None, +) -> None: + """ + For the given tenant_id and index_name (often in the form /), + find documents via search_for_document, then delete them one at a time using Vespa's + /document/v1///docid/ endpoint. + + :param index_name: Typically / from your schema + :param tenant_id: The tenant to match in your Vespa search + :param route: Optional route parameter for delete + :param condition: Optional conditional remove + :param timeout: e.g. '10s' + :param tracelevel: Increase for verbose logs + """ + deleted_count = 0 + while True: + # Search for documents with the given tenant_id + docs = search_for_document( + index_name=index_name, + document_id=None, + tenant_id=tenant_id, + max_hits=100, # Fetch in batches of 100 + ) + + if not docs: + logger.info("No more documents found to delete.") + break + + with get_vespa_http_client() as client: + for doc in docs: + if count is not None and deleted_count >= count: + logger.info(f"Reached maximum delete limit of {count} documents.") + return + + fields = doc.get("fields", {}) + doc_id_value = fields.get("document_id") or fields.get("documentid") + tenant_id = fields.get("tenant_id") + if tenant_id != tenant_id: + raise Exception("Tenant ID mismatch") + + if not doc_id_value: + logger.warning( + "Skipping a document that has no document_id in 'fields'." + ) + continue + + url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_id_value}" + + params = {} + if condition: + params["condition"] = condition + if route: + params["route"] = route + if timeout: + params["timeout"] = timeout + if tracelevel is not None: + params["tracelevel"] = str(tracelevel) + + response = client.delete(url, params=params) + if response.status_code == 200: + logger.info(f"Successfully deleted doc_id={doc_id_value}") + deleted_count += 1 + else: + logger.error( + f"Failed to delete doc_id={doc_id_value}, " + f"status={response.status_code}, response={response.text}" + ) + print( + f"Could not delete doc_id={doc_id_value}. " + f"Status={response.status_code}, response={response.text}" + ) + raise Exception( + f"Could not delete doc_id={doc_id_value}. " + f"Status={response.status_code}, response={response.text}" + ) + + logger.info(f"Deleted {deleted_count} documents in total.") + + def main() -> None: parser = argparse.ArgumentParser(description="Vespa debugging tool") parser.add_argument( @@ -630,7 +790,9 @@ def main() -> None: args = parser.parse_args() vespa_debug = VespaDebugging(args.tenant_id) - if args.action == "config": + if args.action == "delete-all-documents": + vespa_debug.delete_documents_for_tenant(args.count) + elif args.action == "config": vespa_debug.print_config() elif args.action == "connect": vespa_debug.check_connectivity()