Skip to content

Commit

Permalink
feat: orjson serialization (#452)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmoralez authored Aug 26, 2024
1 parent e5a0426 commit 402139a
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 37 deletions.
54 changes: 35 additions & 19 deletions nbs/nixtla_client.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"import httpcore\n",
"import httpx\n",
"import numpy as np\n",
"import orjson\n",
"import pandas as pd\n",
"import utilsforecast.processing as ufp\n",
"from fastcore.basics import patch\n",
Expand Down Expand Up @@ -145,7 +146,7 @@
"from time import time, sleep\n",
"\n",
"from dotenv import load_dotenv\n",
"from fastcore.test import test_eq, test_fail, test_warns\n",
"from fastcore.test import test_eq, test_fail\n",
"from utilsforecast.data import generate_series\n",
"\n",
"from nixtla.date_features import SpecialDates"
Expand Down Expand Up @@ -528,7 +529,7 @@
" processed_X = ufp.process_df(\n",
" df=X_df, id_col=id_col, time_col=time_col, target_col=None,\n",
" )\n",
" X_future = processed_X.data.T.tolist()\n",
" X_future = processed_X.data.T\n",
" else:\n",
" X_future = None\n",
" x_cols = [c for c in df.columns if c not in (id_col, time_col, target_col)]\n",
Expand Down Expand Up @@ -661,7 +662,10 @@
" base_url = os.getenv('NIXTLA_BASE_URL', 'https://api.nixtla.io')\n",
" self._client_kwargs = {\n",
" 'base_url': base_url,\n",
" 'headers': {'Authorization': f'Bearer {api_key}'},\n",
" 'headers': {\n",
" 'Authorization': f'Bearer {api_key}',\n",
" 'Content-Type': 'application/json',\n",
" },\n",
" 'timeout': timeout,\n",
" }\n",
" self._retry_strategy = _retry_strategy(\n",
Expand All @@ -674,12 +678,17 @@
" self.supported_models = ['timegpt-1', 'timegpt-1-long-horizon']\n",
"\n",
" def _make_request(self, client: httpx.Client, endpoint: str, payload: Dict[str, Any]) -> Dict[str, Any]:\n",
" resp = client.request(\n",
" method='post',\n",
" url=endpoint,\n",
" json=payload,\n",
" )\n",
" resp_body = resp.json()\n",
" def ensure_contiguous_arrays(d: Dict[str, Any]) -> None:\n",
" for k, v in d.items():\n",
" if isinstance(v, np.ndarray):\n",
" d[k] = np.ascontiguousarray(v)\n",
" elif isinstance(v, dict):\n",
" ensure_contiguous_arrays(v) \n",
"\n",
" ensure_contiguous_arrays(payload)\n",
" content = orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)\n",
" resp = client.post(url=endpoint, content=content)\n",
" resp_body = orjson.loads(resp.content)\n",
" if resp.status_code != 200:\n",
" raise ApiError(status_code=resp.status_code, body=resp_body)\n",
" if 'data' in resp_body:\n",
Expand Down Expand Up @@ -993,16 +1002,16 @@
" )\n",
" processed = _tail(processed, new_input_size)\n",
" if processed.data.shape[1] > 1:\n",
" X = processed.data[:, 1:].T.tolist()\n",
" X = processed.data[:, 1:].T\n",
" logger.info(f'Using the following exogenous features: {x_cols}')\n",
" else:\n",
" X = None\n",
"\n",
" logger.info('Calling Forecast Endpoint...')\n",
" payload = {\n",
" 'series': {\n",
" 'y': processed.data[:, 0].tolist(),\n",
" 'sizes': np.diff(processed.indptr).tolist(),\n",
" 'y': processed.data[:, 0],\n",
" 'sizes': np.diff(processed.indptr),\n",
" 'X': X,\n",
" 'X_future': X_future,\n",
" },\n",
Expand Down Expand Up @@ -1176,16 +1185,16 @@
" target_col=target_col,\n",
" )\n",
" if processed.data.shape[1] > 1:\n",
" X = processed.data[:, 1:].T.tolist()\n",
" X = processed.data[:, 1:].T\n",
" logger.info(f'Using the following exogenous features: {x_cols}')\n",
" else:\n",
" X = None\n",
"\n",
" logger.info('Calling Anomaly Detector Endpoint...')\n",
" payload = {\n",
" 'series': {\n",
" 'y': processed.data[:, 0].tolist(),\n",
" 'sizes': np.diff(processed.indptr).tolist(),\n",
" 'y': processed.data[:, 0],\n",
" 'sizes': np.diff(processed.indptr),\n",
" 'X': X,\n",
" },\n",
" 'model': model,\n",
Expand Down Expand Up @@ -1360,7 +1369,14 @@
" time_col=time_col,\n",
" target_col=target_col,\n",
" )\n",
" targets = df[target_col].to_numpy()\n",
" if isinstance(df, pd.DataFrame):\n",
" # in pandas<2.2 to_numpy can lead to an object array if\n",
" # the type is a pandas nullable type, e.g. pd.Float64Dtype\n",
" # we thus use the dtype's type as the target dtype\n",
" target_dtype = df.dtypes[target_col].type\n",
" targets = df[target_col].to_numpy(dtype=target_dtype)\n",
" else:\n",
" targets = df[target_col].to_numpy()\n",
" times = df[time_col].to_numpy()\n",
" if processed.sort_idxs is not None:\n",
" targets = targets[processed.sort_idxs]\n",
Expand All @@ -1380,16 +1396,16 @@
" times = _array_tails(times, orig_indptr, np.diff(processed.indptr))\n",
" targets = _array_tails(targets, orig_indptr, np.diff(processed.indptr))\n",
" if processed.data.shape[1] > 1:\n",
" X = processed.data[:, 1:].T.tolist()\n",
" X = processed.data[:, 1:].T\n",
" logger.info(f'Using the following exogenous features: {x_cols}')\n",
" else:\n",
" X = None\n",
"\n",
" logger.info('Calling Cross Validation Endpoint...')\n",
" payload = {\n",
" 'series': {\n",
" 'y': targets.tolist(),\n",
" 'sizes': np.diff(processed.indptr).tolist(),\n",
" 'y': targets,\n",
" 'sizes': np.diff(processed.indptr),\n",
" 'X': X,\n",
" },\n",
" 'model': model,\n",
Expand Down
52 changes: 34 additions & 18 deletions nixtla/nixtla_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import httpcore
import httpx
import numpy as np
import orjson
import pandas as pd
import utilsforecast.processing as ufp
from fastcore.basics import patch
Expand Down Expand Up @@ -463,7 +464,7 @@ def _preprocess(
time_col=time_col,
target_col=None,
)
X_future = processed_X.data.T.tolist()
X_future = processed_X.data.T
else:
X_future = None
x_cols = [c for c in df.columns if c not in (id_col, time_col, target_col)]
Expand Down Expand Up @@ -590,7 +591,10 @@ def __init__(
base_url = os.getenv("NIXTLA_BASE_URL", "https://api.nixtla.io")
self._client_kwargs = {
"base_url": base_url,
"headers": {"Authorization": f"Bearer {api_key}"},
"headers": {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
"timeout": timeout,
}
self._retry_strategy = _retry_strategy(
Expand All @@ -607,12 +611,17 @@ def __init__(
def _make_request(
self, client: httpx.Client, endpoint: str, payload: Dict[str, Any]
) -> Dict[str, Any]:
resp = client.request(
method="post",
url=endpoint,
json=payload,
)
resp_body = resp.json()
def ensure_contiguous_arrays(d: Dict[str, Any]) -> None:
for k, v in d.items():
if isinstance(v, np.ndarray):
d[k] = np.ascontiguousarray(v)
elif isinstance(v, dict):
ensure_contiguous_arrays(v)

ensure_contiguous_arrays(payload)
content = orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)
resp = client.post(url=endpoint, content=content)
resp_body = orjson.loads(resp.content)
if resp.status_code != 200:
raise ApiError(status_code=resp.status_code, body=resp_body)
if "data" in resp_body:
Expand Down Expand Up @@ -925,16 +934,16 @@ def forecast(
)
processed = _tail(processed, new_input_size)
if processed.data.shape[1] > 1:
X = processed.data[:, 1:].T.tolist()
X = processed.data[:, 1:].T
logger.info(f"Using the following exogenous features: {x_cols}")
else:
X = None

logger.info("Calling Forecast Endpoint...")
payload = {
"series": {
"y": processed.data[:, 0].tolist(),
"sizes": np.diff(processed.indptr).tolist(),
"y": processed.data[:, 0],
"sizes": np.diff(processed.indptr),
"X": X,
"X_future": X_future,
},
Expand Down Expand Up @@ -1112,16 +1121,16 @@ def detect_anomalies(
target_col=target_col,
)
if processed.data.shape[1] > 1:
X = processed.data[:, 1:].T.tolist()
X = processed.data[:, 1:].T
logger.info(f"Using the following exogenous features: {x_cols}")
else:
X = None

logger.info("Calling Anomaly Detector Endpoint...")
payload = {
"series": {
"y": processed.data[:, 0].tolist(),
"sizes": np.diff(processed.indptr).tolist(),
"y": processed.data[:, 0],
"sizes": np.diff(processed.indptr),
"X": X,
},
"model": model,
Expand Down Expand Up @@ -1298,7 +1307,14 @@ def cross_validation(
time_col=time_col,
target_col=target_col,
)
targets = df[target_col].to_numpy()
if isinstance(df, pd.DataFrame):
# in pandas<2.2 to_numpy can lead to an object array if
# the type is a pandas nullable type, e.g. pd.Float64Dtype
# we thus use the dtype's type as the target dtype
target_dtype = df.dtypes[target_col].type
targets = df[target_col].to_numpy(dtype=target_dtype)
else:
targets = df[target_col].to_numpy()
times = df[time_col].to_numpy()
if processed.sort_idxs is not None:
targets = targets[processed.sort_idxs]
Expand All @@ -1318,16 +1334,16 @@ def cross_validation(
times = _array_tails(times, orig_indptr, np.diff(processed.indptr))
targets = _array_tails(targets, orig_indptr, np.diff(processed.indptr))
if processed.data.shape[1] > 1:
X = processed.data[:, 1:].T.tolist()
X = processed.data[:, 1:].T
logger.info(f"Using the following exogenous features: {x_cols}")
else:
X = None

logger.info("Calling Cross Validation Endpoint...")
payload = {
"series": {
"y": targets.tolist(),
"sizes": np.diff(processed.indptr).tolist(),
"y": targets,
"sizes": np.diff(processed.indptr),
"X": X,
},
"model": model,
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
install_requires=[
"fastcore",
"httpx",
"orjson",
"pandas",
"pydantic",
"tenacity",
Expand Down

0 comments on commit 402139a

Please sign in to comment.