Skip to content

Commit

Permalink
cloud-1608 async-support
Browse files Browse the repository at this point in the history
- add async support for upsert
- provide async
testing and requesting
  • Loading branch information
omerXfaruq committed Jan 20, 2024
1 parent 5ba0320 commit b49298a
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 5 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ packages = [{ include = "upstash_vector" }]
[tool.poetry.dependencies]
python = "^3.8"
requests = "^2.31.0"
httpx = "^0.26.0"

[tool.poetry.group.dev.dependencies]
mypy = "^1.8.0"
Expand Down
31 changes: 31 additions & 0 deletions tests/core/test_upsert.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pytest

from upstash_vector import Index
from upstash_vector.types import Vector

Expand Down Expand Up @@ -30,6 +32,35 @@ def test_upsert_tuple(index: Index):
assert res[1].vector == v2_values


@pytest.mark.asyncio
async def test_upsert_tuple_async(index: Index):
v1_id = "id1"
v1_metadata = {"metadata_field": "metadata_value"}
v1_values = [0.1, 0.2]

v2_id = "id2"
v2_values = [0.3, 0.4]

await index.upsert_async(
vectors=[
(v1_id, v1_values, v1_metadata),
(v2_id, v2_values),
]
)

res = index.fetch(ids=[v1_id, v2_id], include_vectors=True, include_metadata=True)

assert res[0] is not None
assert res[0].id == v1_id
assert res[0].metadata == v1_metadata
assert res[0].vector == v1_values

assert res[1] is not None
assert res[1].id == v2_id
assert res[1].metadata is None
assert res[1].vector == v2_values


def test_upsert_dict(index: Index):
v1_id = "dict_id1"
v1_metadata = {"metadata_field": "metadata_value"}
Expand Down
14 changes: 13 additions & 1 deletion upstash_vector/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from requests import Session
from upstash_vector.http import execute_with_parameters, generate_headers
from upstash_vector.http import (
execute_with_parameters,
execute_with_parameters_async,
generate_headers,
)
from upstash_vector.core.index_operations import IndexOperations
from typing import Any
from os import environ
Expand Down Expand Up @@ -44,6 +48,14 @@ def _execute_request(self, payload: Any = "", path: str = ""):
payload=payload,
)

async def _execute_request_async(self, payload: Any = "", path: str = ""):
url_with_path = f"{self._url}{path}"
return execute_with_parameters_async(
url=url_with_path,
headers=self._headers,
payload=payload,
)

@classmethod
def from_env(cls, retries: int = 3, retry_interval: float = 1.0) -> "Index":
"""
Expand Down
29 changes: 29 additions & 0 deletions upstash_vector/core/index_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ class IndexOperations:
def _execute_request(self, payload, path):
raise NotImplementedError("execute_request")

async def _execute_request_async(self, payload, path):
raise NotImplementedError("execute_request")

def upsert(
self,
vectors: Sequence[Union[Dict, tuple, Vector]],
Expand Down Expand Up @@ -72,6 +75,32 @@ def upsert(

return self._execute_request(payload=payload, path=UPSERT_PATH)

async def upsert_async(
self,
vectors: Sequence[Union[Dict, tuple, Vector]],
) -> str:
"""
Upserts(update or insert) vectors asynchronously. For more details check Index.upsert.
```python
from upstash_vector import Vector
res = await index.upsert_async(
vectors=[
Vector(id="id5", vector=[1, 2], metadata={"metadata_f": "metadata_v"}),
Vector(id="id6", vector=[6, 7]),
]
)
```
"""
vectors = convert_to_vectors(vectors)
payload = [
{"id": vector.id, "vector": vector.vector, "metadata": vector.metadata}
for vector in vectors
]

return await self._execute_request_async(payload=payload, path=UPSERT_PATH)

def query(
self,
vector: List[float],
Expand Down
23 changes: 19 additions & 4 deletions upstash_vector/http.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import os
import time
from typing import Any, Dict

from httpx import AsyncClient
from requests import Session
from upstash_vector import __version__
from platform import python_version

from upstash_vector import __version__
from upstash_vector.errors import UpstashError


def generate_headers(token) -> Dict[str, str]:
headers = {
"Authorization": f"Bearer {token}",
"Upstash-Telemetry-Sdk": f"upstash-vector-py@v{__version__}",
"Upstash-Telemetry-Runtime": f"python@v{python_version()}",
}
headers["Upstash-Telemetry-Sdk"] = f"upstash-vector-py@v{__version__}"
headers["Upstash-Telemetry-Runtime"] = f"python@v{python_version()}"

if os.getenv("VERCEL"):
platform = "vercel"
Expand Down Expand Up @@ -56,3 +56,18 @@ def execute_with_parameters(
raise UpstashError(response["error"])

return response["result"]


async def execute_with_parameters_async(
url: str,
headers: Dict[str, str],
payload: Any,
) -> Any:
with AsyncClient(timeout=30) as client:
request = await client.post(url=url, headers=headers, json=payload)
response = request.json()

if response.get("error"):
raise UpstashError(response["error"])

return response["result"]

0 comments on commit b49298a

Please sign in to comment.