Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Golden spike" PR #488

Draft
wants to merge 47 commits into
base: main
Choose a base branch
from
Draft

"Golden spike" PR #488

wants to merge 47 commits into from

Conversation

knighton
Copy link
Contributor

@knighton knighton commented Oct 28, 2023

This big PR (having abandoned all pretenses, from a gentler time perhaps, that it be right and just, to dither over a warp sequence of little PRs, not that we have ever done that, but insofar as that could be avoided, because we should think bigly, and not split ourselves into little incremental pieces, which would be difficult to follow, and I daresay, might become big and complex in its own right for reasons downright specious, and silly) is laid out as follows:

Preparations:

  • Yuge code reorg
  • Judo chops to various Areas of Concern

Borg-style assimilation of other people's projects:

  • How we stream Parquets
  • How we stream Delta tables
  • Bonus: how we stream Lance datasets

1. Reorienting the codebase around where and how we focus

Some say it's the most beautiful code reorg they've ever seen.

1.1. Moved benchmark dirs together out of scripts/ into a new top-level benchmarks/.

benchmarks/
├── backends
│   ├── generate_datasets.py
│   ├── __init__.py
│   ├── iterate_datasets.py
│   └── task.py
├── compression
│   ├── bench.py
│   └── plot.py
├── epoch
│   └── bench.py
├── hashing
│   ├── bench.py
│   └── plot.py
├── partitioning
│   ├── bench.py
│   ├── diff.py
│   ├── plot.py
│   ├── txt.py
│   └── web.py
├── samples
│   └── bench_and_plot.py
├── serialization
│   ├── compare.py
│   └── survey_fixed_decimals.py
└── shuffling
    ├── bench.py
    ├── plot.py
    └── vis.py

9 directories, 20 files

1.2. examples/, which entirely consisted of python notebooks, is now notebooks/.

This is to rectify patterns of confused people that I have been seeing in the github trafffic since the time of the dinosaurs, and also to make way for item 3 below.

notebooks/
├── cifar10.ipynb
├── facesynthetics.ipynb
├── multiprocess_dataset_conversion.ipynb
├── spark_dataframe_to_MDS.ipynb
└── synthetic_nlp.ipynb

1 directory, 5 files

1.3. The modality-specific sceond-level directories under streaming/ and their convert/ bastard stepchildren subdirs are now grouped into a reborn top-level examples/ directory, as examples/(modality)/(dataset)/.

  • With streaming dataset subclass (i.e. the reading) and conversion code (i.e., the writing) together in one place.
  • This sort of catalog of dataset examples is properly outside of the streaming/ tree, nobody is importing that stuff, they are referencing it if anything, everybody rolls their own usage in practice.
  • Relating to some argumentation about overall direction that I posted to slack a few days ago.
  • This includes the non-mainly-about-benchmarking remainder of scripts/ like the webvid demos, now in their proper place (examples/multimodal/webvid/).
  • Technically, this is excluding vision/'s base.py, which is rolled into the main streaming/base/ as vision.py next to StreamingDataset, etc. (i.e., VisionStreamingDataset sub-base-class).
  • It is so much nicer, just tree what it looks like currently.
examples/
├── multimodal
│   ├── laion400m
│   │   ├── convert_and_upload.py
│   │   ├── convert_and_upload.sh
│   │   ├── download_data.sh
│   │   ├── download_meta.sh
│   │   ├── __init__.py
│   │   └── README.md
│   └── webvid
│       ├── read.py
│       ├── webvid
│       │   ├── bench_inside.py
│       │   ├── bench_outside_dt.py
│       │   ├── bench_outside_gi.py
│       │   └── plot.py
│       └── write
│           ├── crawl_webvid.py
│           ├── crawl_webvid_subsets.py
│           ├── extract_webvid_videos.py
│           ├── __init__.py
│           └── README.md
├── text
│   ├── c4
│   │   ├── README.md
│   │   ├── read.py
│   │   └── write.py
│   ├── enwiki_tok
│   │   ├── __init__.py
│   │   ├── mds
│   │   │   ├── create_pretraining_data.py
│   │   │   ├── __init__.py
│   │   │   ├── make_eval.sh
│   │   │   ├── make_train_parallel.py
│   │   │   ├── merge_shard_groups.py
│   │   │   ├── pick_eval_samples.py
│   │   │   ├── README.md
│   │   │   ├── tokenization.py
│   │   │   └── vocab.txt
│   │   └── tfrecord
│   │       ├── count_samples.py
│   │       ├── create_pretraining_data.py
│   │       ├── __init__.py
│   │       ├── make_eval.sh
│   │       ├── make_train_parallel.py
│   │       ├── make_train.sh
│   │       ├── pick_eval_samples.py
│   │       ├── tokenization.py
│   │       └── vocab.txt
│   ├── enwiki_txt
│   │   ├── README.md
│   │   ├── read.py
│   │   └── write.py
│   └── pile
│       ├── README.md
│       ├── read.py
│       └── write.py
└── vision
    ├── ade20k
    │   ├── README.md
    │   ├── read.py
    │   └── write.py
    ├── cifar10
    │   ├── README.md
    │   ├── read.py
    │   ├── write_fake.py
    │   └── write.py
    ├── coco
    │   ├── README.md
    │   ├── read.py
    │   └── write.py
    └── imagenet
        ├── README.md
        ├── read.py
        └── write.py

18 directories, 57 files

1.4. And now for what we've all been waiting for: base/ has been sublimed

  • StreamingDataset and its supporting code is all of what Streaming is about, and accordingly base/ of streaming/base/ is merged upward into streaming/.
  • No longer are 75% of the second-level dirs under streaming/ half-assed collections of example datasets that nobody apparently uses, and we never work on.
  • The other base/ in streaming/base/format/base/(reader/writer base classes) is merged upward as well, as an example to others.
  • Do note: for external users, the import paths are unchanged (e.g, from streaming import MDSWriter, StreamingDataset) as although our imports lived under streaming/base/... in the old setup, the streaming/__init__.py mirrored them all at the top level (another missed sign that they were in the wrong place?).
streaming
├── array.py
├── batching
│   ├── __init__.py
│   ├── per_stream.py
│   ├── random.py
│   └── stratified.py
├── cli
│   ├── <dir omitted because I'm still in the arena trying things here>
├── compression.py
├── constant.py
├── converters
│   ├── dataframe_to_mds.py
│   ├── __init__.py
│   └── README.md
├── dataloader.py
├── dataset.py
├── distributed.py
├── format
│   ├── delta
│   │   ├── indexing.py
│   │   ├── __init__.py
│   │   ├── reader.py
│   │   └── state.py
│   ├── index.py
│   ├── __init__.py
│   ├── json
│   │   ├── encodings.py
│   │   ├── __init__.py
│   │   ├── reader.py
│   │   ├── README.md
│   │   └── writer.py
│   ├── lance
│   │   ├── indexing.py
│   │   ├── __init__.py
│   │   └── reader.py
│   ├── mds
│   │   ├── encodings.py
│   │   ├── __init__.py
│   │   ├── reader.py
│   │   ├── README.md
│   │   ├── writer.py
│   ├── parquet
│   │   ├── indexing.py
│   │   ├── __init__.py
│   │   └── reader.py
│   ├── reader.py
│   ├── writer.py
│   └── xsv
│       ├── encodings.py
│       ├── __init__.py
│       ├── reader.py
│       ├── README.md
│       └── writer.py
├── hashing.py
├── __init__.py
├── local.py
├── partition
│   ├── __init__.py
│   ├── orig.py
│   └── relaxed.py
├── py.typed
├── sampling.py
├── shared
│   ├── array.py
│   ├── barrier.py
│   ├── __init__.py
│   ├── memory.py
│   ├── prefix.py
│   └── scalar.py
├── shuffle
│   ├── __init__.py
│   ├── naive.py
│   ├── py1b.py
│   ├── py1br.py
│   ├── py1e.py
│   ├── py1s.py
│   └── py2s.py
├── spanner.py
├── storage
│   ├── download.py
│   ├── extra.py
│   ├── __init__.py
│   └── upload.py
├── stream.py
├── util
│   ├── importing.py
│   ├── __init__.py
│   ├── merging.py
│   ├── migration.py
│   ├── pretty.py
│   ├── retrying.py
│   └── shared.py
├── _version.py
├── vision.py
└── world.py

1.5 Summary

Moves:

  • examples/ -> notebooks/
  • streaming/{multimodal, vision, text}/ -> examples/
  • scripts/ -> mostly benchmarks/, except for some to examples/
  • streaming/base/format/base/ -> streaming/format/
  • streaming/base/ -> streaming/

Comparison of dir trees to get a feel; data/ on rhs is gitignored, kept because it seemed mildy interesting (cur | this PR):

├── docs                                ├── benchmarks
│   └── source                          │   ├── backends
│       ├── examples -> ../../examples  │   ├── compression
│       ├── fundamentals                │   ├── epoch
│       ├── getting_started             │   ├── hashing
│       ├── how_to_guides               │   ├── partitioning
│       ├── _static                     │   ├── samples
│       │   ├── css                     │   ├── serialization
│       │   ├── images                  │   └── shuffling
│       │   └── js                      ├── data
│       └── _templates                  │   └── compare-backends
│           └── sidebar                 │       └── gold
├── examples                            │           ├── csv
├── mosaicml_streaming.egg-info         │           │   ├── train
├── regression                          │           │   └── val
├── scripts                             │           ├── delta
│   ├── compression                     │           │   ├── train
│   ├── epoch                           │           │   │   └── _delta_log
│   ├── hashing                         │           │   └── val
│   ├── partition                       │           │       └── _delta_log
│   ├── samples                         │           ├── jsonl
│   ├── serialization                   │           │   ├── train
│   ├── shuffle                         │           │   └── val
│   └── webvid                          │           ├── lance
├── streaming                           │           │   ├── train
│   ├── base                            │           │   │   ├── data
│   │   ├── batching                    │           │   │   ├── _transactions
│   │   ├── converters                  │           │   │   └── _versions
│   │   ├── format                      │           │   └── val
│   │   │   ├── base                    │           │       ├── data
│   │   │   ├── json                    │           │       ├── _transactions
│   │   │   ├── mds                     │           │       └── _versions
│   │   │   └── xsv                     │           ├── mds
│   │   ├── partition                   │           │   ├── train
│   │   ├── shared                      │           │   └── val
│   │   ├── shuffle                     │           └── parquet
│   │   └── storage                     │               ├── train
│   ├── multimodal                      │               └── val
│   │   └── convert                     ├── docs
│   │       ├── laion                   │   └── source
│   │       │   └── laion400m           │       ├── fundamentals
│   │       └── webvid                  │       ├── getting_started
│   ├── text                            │       ├── how_to_guides
│   │   └── convert                     │       ├── notebooks -> ../../notebooks/
│   │       └── enwiki                  │       ├── _static
│   │           ├── mds                 │       │   ├── css
│   │           └── tfrecord            │       │   ├── images
│   └── vision                          │       │   └── js
│       └── convert                     │       └── _templates
└── tests                               │           └── sidebar
    ├── base                            ├── examples
    │   └── converters                  │   ├── multimodal
    └── common                          │   │   ├── laion400m
                                        │   │   └── webvid
                                        │   │       ├── webvid
                                        │   │       └── write
                                        │   ├── text
                                        │   │   ├── c4
                                        │   │   ├── enwiki_tok
                                        │   │   │   ├── mds
                                        │   │   │   └── tfrecord
                                        │   │   ├── enwiki_txt
                                        │   │   └── pile
                                        │   └── vision
                                        │       ├── ade20k
                                        │       ├── cifar10
                                        │       ├── coco
                                        │       └── imagenet
                                        ├── mosaicml_streaming.egg-info
                                        ├── notebooks
                                        ├── regression
                                        ├── streaming
                                        │   ├── batching
                                        │   ├── cli
                                        │   ├── converters
                                        │   ├── format
                                        │   │   ├── delta
                                        │   │   ├── json
                                        │   │   ├── lance
                                        │   │   ├── mds
                                        │   │   ├── parquet
                                        │   │   └── xsv
                                        │   ├── partition
                                        │   ├── shared
                                        │   ├── shuffle
                                        │   ├── storage
                                        │   └── util
                                        └── tests
                                            ├── base
                                            │   └── converters
                                            └── common

Saving code used for "cat(zip(*jagged text blocks))" here in case of my future reference, by the way is there any interest in rewriting streaming with algebraic variable names? Could be big...

from sys import argv

load = lambda f: open(f).read().strip().split('\n')

get_max_w = lambda ss: max(map(len, ss))

set_w = lambda ss, z, c: list(map(lambda s: s + c * (z - len(s)), ss))

get_max_h = lambda *sss: max(map(len, sss))

set_h = lambda ss, h: ss + [''] * (h - len(ss))

get_div = lambda h, w, c: [w * c] * h

jag = lambda ss: list(map(lambda s: s.rstrip(), ss))

w_join = lambda *sss: list(map(''.join, zip(*sss)))

pad_c = ' '
div_c = ' '
div_w = 2

_, f0, f1 = argv

ss0 = load(f0)
w0 = get_max_w(ss0)
ss0 = set_w(ss0, w0, pad_c)

ss1 = load(f1)
w1 = get_max_w(ss1)
ss1 = set_w(ss1, w1, pad_c)

h = get_max_h(ss0, ss1)
ss0 = set_h(ss0, h)
ss1 = set_h(ss1, h)

ss0 = set_w(ss0, w0, pad_c)
ss1 = set_w(ss1, w1, pad_c)

div = get_div(h, div_w, div_c)

ss1 = jag(ss1)
ss = w_join(ss0, div, ss1)
print('\n'.join(ss))

2. Plugging various gaps

2.1. Old streaming/base/util.py had gotten excessive and incoherent and was divided five ways

  • Not totes sold on my module naming, but at least their methods divide very cleanly as y'all know
  • I dream of a world where functional-style modules are always clearly named after topical mass nouns or gerunds/non-finite verbs, avoiding colliding with finite verby APIs or keywords
  • This PR is too big to review easily, will just dump which methods it seems like easiest way.
  • Astute/grizzled observers will notice a missing method or two, storage-related stuff went to streaming/storage/.

importing.py

"""User-friendly import exception message."""

def get_import_exception_message(package_name: str, extra_deps: str) -> str:

merging.py

How did dataset merges become so goddamned complicated? Is this what it would look like if our storage API was just not up to the task...

"""Merging serialized streaming datasets."""

def merge_index(*args: Any, **kwargs: Any):

def _merge_index_from_list(index_file_urls: List[Union[str, Tuple[str, str]]],
                           out: Union[str, Tuple[str, str]],
                           keep_local: bool = True,
                           download_timeout: int = 60) -> None:

def _merge_index_from_root(out: Union[str, Tuple[str, str]],
                           keep_local: bool = True,
                           download_timeout: int = 60) -> None:

migration.py (module added to util/ afterward)

"""Graceful migration of StreamingDataset arguments."""

# ...

pretty.py

"""Conversions between human-friendly string forms and int/float."""

# Rewrote it, see 2.2.

retrying.py

"""Decorator that retries the wrapped function with backoff."""

@overload
def retry(
    exc_class: Union[Type[Exception], Sequence[Type[Exception]]] = ...,
    num_attempts: int = ...,
    initial_backoff: float = ...,
    max_jitter: float = ...,
) -> Callable[[TCallable], TCallable]:

@overload
def retry(exc_class: TCallable) -> TCallable:

# error: Type "(TCallable@retry) -> TCallable@retry" cannot be assigned to type
# "(func: Never) -> Never"
def retry(  # type: ignore
    exc_class: Union[TCallable, Type[Exception], Sequence[Type[Exception]]] = Exception,
    num_attempts: int = 3,
    initial_backoff: float = 1.0,
    max_jitter: float = 0.5,
):

shared.py

"""Shared memory utilities."""

def clean_stale_shared_memory() -> None:

2.2. Needed more than number_abbev_to_int and bytes_to_int for purposes later in this PR, now we have streaming/util/pretty.py

  • More generalized impl, e.g. now we have elapsed time support via simply pulling a return _normalize_float(duration, _duration_units).
  • Reasonably hardened to pet edge cases.
  • More proper about how it treats base-1000 vs base-1024.
  • Different philosophy about handling arbitrary user input, is of course a hard disciplinarian.
  • Conversions in the other direction (normalize_(type) <-> prettify_(type)) still to be done, hopefully someone else will rewrite this module again for that lol.
"""Conversions between human-friendly string forms and int/float."""

def unpack_strs(text: str, sep: str = ',') -> List[str]:
    """Pass a list as a comma-delimited string."""

def unpack_str2str(text: str, sep: str = ',', eq: str = '=') -> Dict[str, str]:
    """Pass a dict as a comma- and equals-delimited string."""

def _normalize_arg(text: str, units: Dict[str, int], to_type: type) -> Union[int, float]:
    """Normalize a human-friendly unit string to number."""

def _normalize_num(arg: Union[int, float, str], units: Dict[str, int],
                   to_type: type) -> Union[int, float]:
    """Normalize from human-friendly argument to number."""

def _normalize_int(arg: Union[int, str], units: Dict[str, int]) -> int:
    """Normalize from human-friendly argument to int."""

def _normalize_float(arg: Union[int, float, str], units: Dict[str, int]) -> int:
    """Normalize from human-friendly argument to float."""

def _get_units(base: int, names: List[str]) -> Dict[str, int]:
    """Generate units mapping given a base and names of powers of that base."""

def normalize_dec_bytes(bytes: Union[int, str]) -> int:
    """Normalize from human-friendly base-1000 bytes to int."""

def normalize_bin_bytes(bytes: Union[int, str]) -> int:
    """Normalize from human-friendly base-1024 bytes to int."""

def normalize_bytes(bytes: Union[int, str]) -> int:
    """Normalize from human-friendly base-1000 or base-1024 bytes to int."""

def normalize_count(count: Union[int, str]) -> int:
    """Normalize from human-friendly count to int."""

def normalize_duration(duration: Union[int, float, str]) -> float:
    """Normalize from human-friendly duration to float."""

2.3. My god our storage API sucks and this is too much for one humble PR, so let's partly bypass and mostly wrap it with more functionality

  • Piecemeal, when required for "later in this PR" purposes.
  • This includes wrap/patch of some behavior that made some API useless for my purposes, that I would consider a bug, but can't rule out other use cases and I have some less typical requirements etc etc.
  • Also, imho docs were lacking on the pedantics of how they and their callers like their edge cases sorted. Speaking of sorting...
  • No doubt this module will continue to accumulate one-off hax until being collected as part of the division cloud-wise instead of *load-wise a la zip(*download_upload_py) any day now...
def wait_for_file_to_exist(filename: str, poll_interval: float, timeout: float,
                           err_msg: str) -> None:
    """Wait for the file to exist till timeout seconds. Raise an Exception after that."""
    
def _normalize_path(path: str) -> Tuple[str, bool]:
    """Analyze the path, returning URI scheme-normalized form and whether is on local.filesystem."""

def _normalize_dir(dirname: str) -> str:
    """Normalize a dirname to contain one trailing slash."""

def walk_prefix(prefix: str) -> List[str]:
    """Recursively list all file paths matching a prefix in sorted order."""

def walk_dir(root: str) -> List[str]:
    """Recursively list the given directory in sorted order."""
    
def _filter(keep: Optional[Union[str, Pattern, Callable[[str], bool]]],
            paths: Optional[Iterable[str]]) -> Iterable[str]:
    """Filter the given paths according to the pattern or predicate."""

def _get_overlap(want: Set[str], have: Set[str]) -> Dict[str, Any]:
    """Get the overlap between two sets for informational/debugging purposes."""

def list_dataset_files(
        *,
        local: str,
        remote: Optional[str] = None,
        split: Optional[str] = None,
        paths: Optional[Iterable[str]] = None,
        keep: Optional[Union[str, Pattern, Callable[[str], bool]]] = None) -> List[str]:
    """Collect all/certain local/remote dataset files, which are then filtered."""

def smart_download_file(*,
                        remote: str,
                        local: str,
                        timeout: Union[float, str] = 60,
                        size: Optional[Union[int, str]] = None,
                        max_size: Optional[Union[int, str]] = None,
                        hashes: Optional[Dict[str, str]] = None) -> None:
    """Download a file from the remote path to the local path, with size/hash checks."""

@knighton knighton marked this pull request as draft October 28, 2023 06:31
Into:
- importing
- merging,
- pretty
- retrying
- shared
- storage.
Was:
- bytes_to_int
- number_abbrev_to_int

Now:
- normalize_dec_bytes
- normalize_bin_bytes
- normalize_bytes
- normalize_count
- normalize_duration
…storage/.

Let's properly integrate these later.

- walk_dir()
- Very Fancy list_dataset_files()
- smart_download_file()
@review-notebook-app
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

```
    ─ ──────── ─ ────── ─ ──────────── ─ ──────── ─ ────────────── ─ ────── ─ ──────────── ─ ────────────── ─
    │ format   │    sec │      samples │  usec/sp │          bytes │  files │   bytes/file │ max bytes/file │
    ─ ──────── ─ ────── ─ ──────────── ─ ──────── ─ ────────────── ─ ────── ─ ──────────── ─ ────────────── ─
    │ csv      │  5.131 │    2,097,152 │    2.446 │    171,899,840 │     41 │    4,192,679 │      8,388,616 │
    │ jsonl    │ 12.535 │    2,097,152 │    5.977 │    211,747,148 │     51 │    4,151,904 │      8,388,607 │
    │ lance    │  1.074 │    2,097,152 │    0.512 │    176,961,928 │     19 │    9,313,785 │     11,067,536 │
    │ mds      │  8.649 │    2,097,152 │    4.124 │    176,880,177 │     23 │    7,690,442 │      8,388,604 │
    │ parquet  │  1.323 │    2,097,152 │    0.631 │     63,528,364 │     16 │    3,970,522 │      3,973,860 │
    │ delta    │ 16.881 │    2,097,152 │    8.050 │     55,106,514 │     66 │      834,947 │      1,710,970 │
    ─ ──────── ─ ────── ─ ──────────── ─ ──────── ─ ────────────── ─ ────── ─ ──────────── ─ ────────────── ─
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant