diff --git a/src/pycrunch/importing.py b/src/pycrunch/importing.py index 4cbb5d1..a119d73 100644 --- a/src/pycrunch/importing.py +++ b/src/pycrunch/importing.py @@ -53,9 +53,23 @@ def wait_for_batch_status(self, batch, status): raise ValueError("The batch did not reach the '%s' state in the " "given time. Please check again later." % status) - def add_schema_metadata(self, ds, source_url, filename, fp, mimetype, schema, metadata): - response = ds.session.post( - source_url, + def add_schema_metadata(self, site, schema, metadata, filename, fp, mimetype="application/x-parquet"): + """ + Create a new Source from a parquet file using schema and metadata. + + Parameters: + site (shoji.Catalog): a shoji Catalog object, from which we acquire session and sources url + schema (str): json string containing schema + metadata (str): json string containing metadata + filename (str): name of file being uploaded + fp (BufferedReader): opened file object + mimetype (str): mimetype of file being uploaded + + Returns: + shoji.Entity: Shoji entity containing the payload, status_code and source_url of uploaded file + """ + response = site.session.post( + site.catalogs.sources, files={ "uploaded_file": (filename, fp, mimetype) }, @@ -63,11 +77,10 @@ def add_schema_metadata(self, ds, source_url, filename, fp, mimetype, schema, me "schema": schema, "metadata": metadata, "crunchlake": "create", - "dataset_id": ds.get("self", "").split("/")[-2] + "dataset_id": "None" } ) - - return shoji.Entity(ds.session, body={ + return shoji.Entity(site.session, body={ "status_code": response.status_code, "payload": response.payload, "source_url": response.headers.get("Location") diff --git a/tests/test_lake.py b/tests/test_lake.py deleted file mode 100644 index 01424d1..0000000 --- a/tests/test_lake.py +++ /dev/null @@ -1,20 +0,0 @@ -import json -import subprocess -import time - - -class TestCrunchLakeWorkflow: - def test_ping(self): - command = ["cr.core.launch", "ping", "crunch-lake-ping", '--kwargs={"pong": "test-ping"}'] - p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - start = time.time() - stdout = p.stdout.readline() - stderr = p.stderr.readline() - - while time.time() - start < 30: - try: - assert "Workflow launched with id" in stderr.decode("utf-8") - except AssertionError as assertion_error: - if not time.time() - start < 30: - raise assertion_error - time.sleep(.01)