Skip to content

Commit

Permalink
Add S3 tests for parquet formatter.
Browse files Browse the repository at this point in the history
  • Loading branch information
erykoff committed Oct 2, 2024
1 parent 9edb4b5 commit 10f9069
Showing 1 changed file with 146 additions and 0 deletions.
146 changes: 146 additions & 0 deletions tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

import datetime
import os
import posixpath
import shutil
import unittest

try:
Expand All @@ -52,6 +54,24 @@
except ImportError:
pd = None

try:
import boto3
import botocore
from lsst.resources.s3utils import clean_test_environment_for_s3

try:
from moto import mock_aws # v5
except ImportError:
from moto import mock_s3 as mock_aws
except ImportError:
boto3 = None

try:
import fsspec
except ImportError:
fsspec = None


from lsst.daf.butler import (
Butler,
Config,
Expand All @@ -61,6 +81,7 @@
StorageClassConfig,
StorageClassFactory,
)
from lsst.resources import ResourcePath

try:
from lsst.daf.butler.delegates.arrowtable import ArrowTableDelegate
Expand Down Expand Up @@ -2123,6 +2144,131 @@ class InMemoryArrowSchemaDelegateTestCase(ParquetFormatterArrowSchemaTestCase):
configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")


@unittest.skipUnless(pa is not None, "Cannot test S3 without pyarrow.")
@unittest.skipUnless(boto3 is not None, "Cannot test S3 without boto3.")
@unittest.skipUnless(fsspec is not None, "Cannot test S3 with fsspec.")
class ParquetFormatterArrowTableS3TestCase(unittest.TestCase):
"""Tests for arrow table/parquet with S3."""

# Code is adapted from test_butler.py
configFile = os.path.join(TESTDIR, "config/basic/butler-s3store.yaml")
fullConfigKey = None
validationCanFail = True

bucketName = "anybucketname"

root = "butlerRoot/"

datastoreStr = [f"datastore={root}"]

datastoreName = ["FileDatastore@s3://{bucketName}/{root}"]

registryStr = "/gen3.sqlite3"

mock_aws = mock_aws()

def setUp(self):
self.root = makeTestTempDir(TESTDIR)

config = Config(self.configFile)
uri = ResourcePath(config[".datastore.datastore.root"])
self.bucketName = uri.netloc

# Enable S3 mocking of tests.
self.enterContext(clean_test_environment_for_s3())
self.mock_aws.start()

rooturi = f"s3://{self.bucketName}/{self.root}"
config.update({"datastore": {"datastore": {"root": rooturi}}})

# need local folder to store registry database
self.reg_dir = makeTestTempDir(TESTDIR)
config["registry", "db"] = f"sqlite:///{self.reg_dir}/gen3.sqlite3"

# MOTO needs to know that we expect Bucket bucketname to exist
# (this used to be the class attribute bucketName)
s3 = boto3.resource("s3")
s3.create_bucket(Bucket=self.bucketName)

self.datastoreStr = [f"datastore='{rooturi}'"]
self.datastoreName = [f"FileDatastore@{rooturi}"]
Butler.makeRepo(rooturi, config=config, forceConfigRoot=False)
self.tmpConfigFile = posixpath.join(rooturi, "butler.yaml")

self.butler = Butler(self.tmpConfigFile, writeable=True, run="test_run")

# No dimensions in dataset type so we don't have to worry about
# inserting dimension data or defining data IDs.
self.datasetType = DatasetType(
"data", dimensions=(), storageClass="ArrowTable", universe=self.butler.dimensions
)
self.butler.registry.registerDatasetType(self.datasetType)

def tearDown(self):
s3 = boto3.resource("s3")
bucket = s3.Bucket(self.bucketName)
try:
bucket.objects.all().delete()
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
# the key was not reachable - pass
pass
else:
raise

bucket = s3.Bucket(self.bucketName)
bucket.delete()

# Stop the S3 mock.
self.mock_aws.stop()

if self.reg_dir is not None and os.path.exists(self.reg_dir):
shutil.rmtree(self.reg_dir, ignore_errors=True)

if os.path.exists(self.root):
shutil.rmtree(self.root, ignore_errors=True)

def testArrowTableS3(self):
tab1 = _makeSimpleArrowTable(include_multidim=True, include_masked=True)

self.butler.put(tab1, self.datasetType, dataId={})

# Read the whole Table.
tab2 = self.butler.get(self.datasetType, dataId={})
# We convert to use the numpy testing framework to handle nan
# comparisons.
self.assertEqual(tab1.schema, tab2.schema)
tab1_np = arrow_to_numpy(tab1)
tab2_np = arrow_to_numpy(tab2)
for col in tab1.column_names:
np.testing.assert_array_equal(tab2_np[col], tab1_np[col])
# Read the columns.
columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={})
self.assertEqual(len(columns2), len(tab1.schema.names))
for i, name in enumerate(tab1.schema.names):
self.assertEqual(columns2[i], name)
# Read the rowcount.
rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={})
self.assertEqual(rowcount, len(tab1))
# Read the schema.
schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={})
self.assertEqual(schema, tab1.schema)
# Read just some columns a few different ways.
tab3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]})
self.assertEqual(tab3, tab1.select(("a", "c")))
tab4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"})
self.assertEqual(tab4, tab1.select(("a",)))
tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]})
self.assertEqual(tab5, tab1.select(("index", "a")))
tab6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"})
self.assertEqual(tab6, tab1.select(("ddd",)))
tab7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]})
self.assertEqual(tab7, tab1.select(("a",)))
# Passing an unrecognized column should be a ValueError.
with self.assertRaises(ValueError):
self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]})


@unittest.skipUnless(np is not None, "Cannot test compute_row_group_size without numpy.")
@unittest.skipUnless(pa is not None, "Cannot test compute_row_group_size without pyarrow.")
class ComputeRowGroupSizeTestCase(unittest.TestCase):
Expand Down

0 comments on commit 10f9069

Please sign in to comment.