From 866e4a80b0a4a2ea1f44b796f5ffa64a603431d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20=C5=BBelasko?= Date: Mon, 10 Jun 2024 14:29:53 -0400 Subject: [PATCH] Support for reading data from AIStore using Python SDK (#1354) * Support for reading data from AIStore using Python SDK * More AIStore related docs --- README.md | 2 ++ docs/getting-started.rst | 5 +++ lhotse/serialization.py | 70 +++++++++++++++++++++++++++++++++------- lhotse/utils.py | 9 ++++++ 4 files changed, 74 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index b5dfbc5ee..3d4bb17f6 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,7 @@ Lhotse uses several environment variables to customize it's behavior. They are a - `LHOTSE_LEGACY_OPUS_LOADING` - (`=1`) reverts to a legacy OPUS loading mechanism that triggered a new ffmpeg subprocess for each OPUS file. - `LHOTSE_PREPARING_RELEASE` - used internally by developers when releasing a new version of Lhotse. - `TORCHAUDIO_USE_BACKEND_DISPATCHER` - when set to `1` and torchaudio version is below 2.1, we'll enable the experimental ffmpeg backend of torchaudio. +- `AIS_ENDPOINT` is read by AIStore client to determine AIStore endpoint URL. Required for AIStore dataloading. - `RANK`, `WORLD_SIZE`, `WORKER`, and `NUM_WORKERS` are internally used to inform Lhotse Shar dataloading subprocesses. - `READTHEDOCS` is internally used for documentation builds. @@ -121,6 +122,7 @@ Lhotse uses several environment variables to customize it's behavior. They are a - `pip install lhotse[webdataset]`. We support "compiling" your data into WebDataset tarball format for more effective IO. You can still interact with the data as if it was a regular lazy CutSet. To learn more, check out the following tutorial: [![Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/lhotse-speech/lhotse/blob/master/examples/02-webdataset-integration.ipynb) - `pip install h5py` if you want to extract speech features and store them as HDF5 arrays. - `pip install dill`. When `dill` is installed, we'll use it to pickle CutSet that uses a lambda function in calls such as `.map` or `.filter`. This is helpful in PyTorch DataLoader with `num_jobs>0`. Without `dill`, depending on your environment, you'll see an exception or a hanging script. +- `pip install aistore` to read manifests, tar fles, and other data from AIStore using AIStore-supported URLs (set `AIS_ENDPOINT` environment variable to activate it). See [AIStore documentation](https://aiatscale.org) for more details. - `pip install smart_open` to read and write manifests and data in any location supported by `smart_open` (e.g. cloud, http). - `pip install opensmile` for feature extraction using the OpenSmile toolkit's Python wrapper. diff --git a/docs/getting-started.rst b/docs/getting-started.rst index c6e17085c..9a299c973 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -133,6 +133,8 @@ Lhotse uses several environment variables to customize it's behavior. They are a * ``TORCHAUDIO_USE_BACKEND_DISPATCHER`` - when set to 1 and torchaudio version is below 2.1, we'll enable the experimental ffmpeg backend of torchaudio. +* ``AIS_ENDPOINT`` is read by AIStore client to determine AIStore endpoint URL. Required for AIStore dataloading. + * ``RANK``, ``WORLD_SIZE``, ``WORKER``, and ``NUM_WORKERS`` are internally used to inform Lhotse Shar dataloading subprocesses. * ``READTHEDOCS`` is internally used for documentation builds. @@ -153,6 +155,8 @@ Optional dependencies * ``pip install dill``. When ``dill`` is installed, we'll use it to pickle CutSet that uses a lambda function in calls such as ``.map`` or ``.filter``. This is helpful in PyTorch DataLoader with ``num_jobs>0``. Without ``dill``, depending on your environment, you'll see an exception or a hanging script. +* ``pip install aistore`` to read manifests, tar fles, and other data from AIStore using AIStore-supported URLs (set ``AIS_ENDPOINT`` environment variable to activate it). See |AIStore| for more details. + * ``pip install smart_open`` to read and write manifests and data in any location supported by ``smart_open`` (e.g. cloud, http). * ``pip install opensmile`` for feature extraction using the OpenSmile toolkit's Python wrapper. @@ -225,3 +229,4 @@ the speech starts roughly at the first second (100 frames): .. _Kaldi: https://github.com/kaldi-asr/kaldi .. _Icefall recipes: https://github.com/k2-fsa/icefall .. _orjson: https://pypi.org/project/orjson/ +.. _AIStore: https://aiatscale.org diff --git a/lhotse/serialization.py b/lhotse/serialization.py index 0a90d98ff..7f18b4dd4 100644 --- a/lhotse/serialization.py +++ b/lhotse/serialization.py @@ -1,15 +1,17 @@ import itertools import json +import os import sys import warnings from codecs import StreamReader, StreamWriter +from functools import lru_cache from io import BytesIO, StringIO from pathlib import Path from typing import Any, Dict, Generator, Iterable, Optional, Type, Union import yaml -from lhotse.utils import Pathlike, is_module_available +from lhotse.utils import Pathlike, SmartOpen, is_module_available, is_valid_url from lhotse.workarounds import gzip_open_robust # TODO: figure out how to use some sort of typing stubs @@ -28,7 +30,8 @@ def open_best(path: Pathlike, mode: str = "r"): either stdin or stdout depending on the mode. The concept is similar to Kaldi's "generalized pipes", but uses WebDataset syntax. """ - if str(path) == "-": + strpath = str(path) + if strpath == "-": if mode == "r": return StdStreamWrapper(sys.stdin) elif mode == "w": @@ -41,22 +44,32 @@ def open_best(path: Pathlike, mode: str = "r"): if isinstance(path, (BytesIO, StringIO, StreamWriter, StreamReader)): return path - if str(path).startswith("pipe:"): + if strpath.startswith("pipe:"): return open_pipe(path[5:], mode) - if is_module_available("smart_open"): - from smart_open import smart_open + if strpath.startswith("ais://"): + return open_aistore(path, mode) - # This will work with JSONL anywhere that smart_open supports, e.g. cloud storage. - open_fn = smart_open - else: - compressed = str(path).endswith(".gz") - if compressed and "t" not in mode and "b" not in mode: + if is_valid_url(strpath): + if is_aistore_available(): + return open_aistore(path, mode) + elif is_module_available("smart_open"): + return SmartOpen.open(path, mode) + else: + raise ValueError( + f"In order to open URLs/URIs please run 'pip install smart_open' " + f"(if you're trying to use AIStore, either the Python SDK is not installed " + f"or {AIS_ENDPOINT_ENVVAR} is not defined." + ) + + compressed = strpath.endswith(".gz") + if compressed: + if "t" not in mode and "b" not in mode: # Opening as bytes not requested explicitly, use "t" to tell gzip to handle unicode. mode = mode + "t" - open_fn = gzip_open_robust if compressed else open + return gzip_open_robust(path, mode) - return open_fn(path, mode) + return open(path, mode) def open_pipe(cmd: str, mode: str): @@ -69,6 +82,39 @@ def open_pipe(cmd: str, mode: str): return Pipe(cmd, mode=mode, shell=True, bufsize=8092) +AIS_ENDPOINT_ENVVAR = "AIS_ENDPOINT" + + +@lru_cache +def is_aistore_available() -> bool: + return AIS_ENDPOINT_ENVVAR in os.environ and is_valid_url( + os.environ[AIS_ENDPOINT_ENVVAR] + ) + + +@lru_cache +def get_aistore_client(): + if not is_module_available("aistore"): + raise ImportError( + "Please run 'pip install aistore' in order to read data from AIStore." + ) + if not is_aistore_available(): + raise ValueError( + "Set a valid URL as AIS_ENDPOINT environment variable's value to read data from AIStore." + ) + from aistore import Client + + endpoint_url = os.environ["AIS_ENDPOINT"] + return Client(endpoint_url) + + +def open_aistore(uri: str, mode: str): + assert "r" in mode, "We only support reading from AIStore at this time." + client = get_aistore_client() + object = client.fetch_object_by_url(uri) + return object.get().raw() + + def save_to_yaml(data: Any, path: Pathlike) -> None: with open_best(path, "w") as f: try: diff --git a/lhotse/utils.py b/lhotse/utils.py index be891ede5..2a44a5a04 100644 --- a/lhotse/utils.py +++ b/lhotse/utils.py @@ -32,6 +32,7 @@ TypeVar, Union, ) +from urllib.parse import urlparse import click import numpy as np @@ -128,6 +129,14 @@ def open(cls, uri, mode="rb", transport_params=None, **kwargs): ) +def is_valid_url(value: str) -> bool: + try: + result = urlparse(value) + return bool(result.scheme) and bool(result.netloc) + except AttributeError: + return False + + def fix_random_seed(random_seed: int): """ Set the same random seed for the libraries and modules that Lhotse interacts with.