Skip to content

Commit

Permalink
Support for reading data from AIStore using Python SDK (#1354)
Browse files Browse the repository at this point in the history
* Support for reading data from AIStore using Python SDK

* More AIStore related docs
  • Loading branch information
pzelasko authored Jun 10, 2024
1 parent 4d57d53 commit 866e4a8
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 12 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Lhotse uses several environment variables to customize it's behavior. They are a
- `LHOTSE_LEGACY_OPUS_LOADING` - (`=1`) reverts to a legacy OPUS loading mechanism that triggered a new ffmpeg subprocess for each OPUS file.
- `LHOTSE_PREPARING_RELEASE` - used internally by developers when releasing a new version of Lhotse.
- `TORCHAUDIO_USE_BACKEND_DISPATCHER` - when set to `1` and torchaudio version is below 2.1, we'll enable the experimental ffmpeg backend of torchaudio.
- `AIS_ENDPOINT` is read by AIStore client to determine AIStore endpoint URL. Required for AIStore dataloading.
- `RANK`, `WORLD_SIZE`, `WORKER`, and `NUM_WORKERS` are internally used to inform Lhotse Shar dataloading subprocesses.
- `READTHEDOCS` is internally used for documentation builds.

Expand All @@ -121,6 +122,7 @@ Lhotse uses several environment variables to customize it's behavior. They are a
- `pip install lhotse[webdataset]`. We support "compiling" your data into WebDataset tarball format for more effective IO. You can still interact with the data as if it was a regular lazy CutSet. To learn more, check out the following tutorial: [![Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/lhotse-speech/lhotse/blob/master/examples/02-webdataset-integration.ipynb)
- `pip install h5py` if you want to extract speech features and store them as HDF5 arrays.
- `pip install dill`. When `dill` is installed, we'll use it to pickle CutSet that uses a lambda function in calls such as `.map` or `.filter`. This is helpful in PyTorch DataLoader with `num_jobs>0`. Without `dill`, depending on your environment, you'll see an exception or a hanging script.
- `pip install aistore` to read manifests, tar fles, and other data from AIStore using AIStore-supported URLs (set `AIS_ENDPOINT` environment variable to activate it). See [AIStore documentation](https://aiatscale.org) for more details.
- `pip install smart_open` to read and write manifests and data in any location supported by `smart_open` (e.g. cloud, http).
- `pip install opensmile` for feature extraction using the OpenSmile toolkit's Python wrapper.

Expand Down
5 changes: 5 additions & 0 deletions docs/getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ Lhotse uses several environment variables to customize it's behavior. They are a

* ``TORCHAUDIO_USE_BACKEND_DISPATCHER`` - when set to 1 and torchaudio version is below 2.1, we'll enable the experimental ffmpeg backend of torchaudio.

* ``AIS_ENDPOINT`` is read by AIStore client to determine AIStore endpoint URL. Required for AIStore dataloading.

* ``RANK``, ``WORLD_SIZE``, ``WORKER``, and ``NUM_WORKERS`` are internally used to inform Lhotse Shar dataloading subprocesses.

* ``READTHEDOCS`` is internally used for documentation builds.
Expand All @@ -153,6 +155,8 @@ Optional dependencies

* ``pip install dill``. When ``dill`` is installed, we'll use it to pickle CutSet that uses a lambda function in calls such as ``.map`` or ``.filter``. This is helpful in PyTorch DataLoader with ``num_jobs>0``. Without ``dill``, depending on your environment, you'll see an exception or a hanging script.

* ``pip install aistore`` to read manifests, tar fles, and other data from AIStore using AIStore-supported URLs (set ``AIS_ENDPOINT`` environment variable to activate it). See |AIStore| for more details.

* ``pip install smart_open`` to read and write manifests and data in any location supported by ``smart_open`` (e.g. cloud, http).

* ``pip install opensmile`` for feature extraction using the OpenSmile toolkit's Python wrapper.
Expand Down Expand Up @@ -225,3 +229,4 @@ the speech starts roughly at the first second (100 frames):
.. _Kaldi: https://github.com/kaldi-asr/kaldi
.. _Icefall recipes: https://github.com/k2-fsa/icefall
.. _orjson: https://pypi.org/project/orjson/
.. _AIStore: https://aiatscale.org
70 changes: 58 additions & 12 deletions lhotse/serialization.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import itertools
import json
import os
import sys
import warnings
from codecs import StreamReader, StreamWriter
from functools import lru_cache
from io import BytesIO, StringIO
from pathlib import Path
from typing import Any, Dict, Generator, Iterable, Optional, Type, Union

import yaml

from lhotse.utils import Pathlike, is_module_available
from lhotse.utils import Pathlike, SmartOpen, is_module_available, is_valid_url
from lhotse.workarounds import gzip_open_robust

# TODO: figure out how to use some sort of typing stubs
Expand All @@ -28,7 +30,8 @@ def open_best(path: Pathlike, mode: str = "r"):
either stdin or stdout depending on the mode.
The concept is similar to Kaldi's "generalized pipes", but uses WebDataset syntax.
"""
if str(path) == "-":
strpath = str(path)
if strpath == "-":
if mode == "r":
return StdStreamWrapper(sys.stdin)
elif mode == "w":
Expand All @@ -41,22 +44,32 @@ def open_best(path: Pathlike, mode: str = "r"):
if isinstance(path, (BytesIO, StringIO, StreamWriter, StreamReader)):
return path

if str(path).startswith("pipe:"):
if strpath.startswith("pipe:"):
return open_pipe(path[5:], mode)

if is_module_available("smart_open"):
from smart_open import smart_open
if strpath.startswith("ais://"):
return open_aistore(path, mode)

# This will work with JSONL anywhere that smart_open supports, e.g. cloud storage.
open_fn = smart_open
else:
compressed = str(path).endswith(".gz")
if compressed and "t" not in mode and "b" not in mode:
if is_valid_url(strpath):
if is_aistore_available():
return open_aistore(path, mode)
elif is_module_available("smart_open"):
return SmartOpen.open(path, mode)
else:
raise ValueError(
f"In order to open URLs/URIs please run 'pip install smart_open' "
f"(if you're trying to use AIStore, either the Python SDK is not installed "
f"or {AIS_ENDPOINT_ENVVAR} is not defined."
)

compressed = strpath.endswith(".gz")
if compressed:
if "t" not in mode and "b" not in mode:
# Opening as bytes not requested explicitly, use "t" to tell gzip to handle unicode.
mode = mode + "t"
open_fn = gzip_open_robust if compressed else open
return gzip_open_robust(path, mode)

return open_fn(path, mode)
return open(path, mode)


def open_pipe(cmd: str, mode: str):
Expand All @@ -69,6 +82,39 @@ def open_pipe(cmd: str, mode: str):
return Pipe(cmd, mode=mode, shell=True, bufsize=8092)


AIS_ENDPOINT_ENVVAR = "AIS_ENDPOINT"


@lru_cache
def is_aistore_available() -> bool:
return AIS_ENDPOINT_ENVVAR in os.environ and is_valid_url(
os.environ[AIS_ENDPOINT_ENVVAR]
)


@lru_cache
def get_aistore_client():
if not is_module_available("aistore"):
raise ImportError(
"Please run 'pip install aistore' in order to read data from AIStore."
)
if not is_aistore_available():
raise ValueError(
"Set a valid URL as AIS_ENDPOINT environment variable's value to read data from AIStore."
)
from aistore import Client

endpoint_url = os.environ["AIS_ENDPOINT"]
return Client(endpoint_url)


def open_aistore(uri: str, mode: str):
assert "r" in mode, "We only support reading from AIStore at this time."
client = get_aistore_client()
object = client.fetch_object_by_url(uri)
return object.get().raw()


def save_to_yaml(data: Any, path: Pathlike) -> None:
with open_best(path, "w") as f:
try:
Expand Down
9 changes: 9 additions & 0 deletions lhotse/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
TypeVar,
Union,
)
from urllib.parse import urlparse

import click
import numpy as np
Expand Down Expand Up @@ -128,6 +129,14 @@ def open(cls, uri, mode="rb", transport_params=None, **kwargs):
)


def is_valid_url(value: str) -> bool:
try:
result = urlparse(value)
return bool(result.scheme) and bool(result.netloc)
except AttributeError:
return False


def fix_random_seed(random_seed: int):
"""
Set the same random seed for the libraries and modules that Lhotse interacts with.
Expand Down

0 comments on commit 866e4a8

Please sign in to comment.