From a4ea5d01953fdbb3aa9a35602ccaf48f60446dd2 Mon Sep 17 00:00:00 2001 From: Mohammed Ali Zubair Date: Thu, 4 Apr 2024 03:22:54 -0400 Subject: [PATCH 1/6] add ping test for crunchlake consumers --- tests/test_lake.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 tests/test_lake.py diff --git a/tests/test_lake.py b/tests/test_lake.py new file mode 100644 index 0000000..01424d1 --- /dev/null +++ b/tests/test_lake.py @@ -0,0 +1,20 @@ +import json +import subprocess +import time + + +class TestCrunchLakeWorkflow: + def test_ping(self): + command = ["cr.core.launch", "ping", "crunch-lake-ping", '--kwargs={"pong": "test-ping"}'] + p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + start = time.time() + stdout = p.stdout.readline() + stderr = p.stderr.readline() + + while time.time() - start < 30: + try: + assert "Workflow launched with id" in stderr.decode("utf-8") + except AssertionError as assertion_error: + if not time.time() - start < 30: + raise assertion_error + time.sleep(.01) From 77e2c0dbc451afebb8277045932cca9093b36fed Mon Sep 17 00:00:00 2001 From: Mohammed Ali Zubair Date: Wed, 17 Apr 2024 12:34:54 -0400 Subject: [PATCH 2/6] add source post with schema and metadata params. add debug method for sources payload --- src/pycrunch/importing.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/pycrunch/importing.py b/src/pycrunch/importing.py index 06bf6f5..2d4a647 100644 --- a/src/pycrunch/importing.py +++ b/src/pycrunch/importing.py @@ -52,6 +52,25 @@ def wait_for_batch_status(self, batch, status): else: raise ValueError("The batch did not reach the '%s' state in the " "given time. Please check again later." % status) + + def add_schema_metadata(self, ds, source_url, filename, fp, mimetype, schema, metadata): + response = ds.session.post( + source_url, + files={ + "uploaded_file": (filename, fp, mimetype) + }, + data={ + "schema": schema, + "metadata": metadata, + } + ) + return response + + def debug_source_payload(self, ds, source_url): + r = ds.session.get(source_url) + if r.payload is None: + raise TypeError("Response could not be parsed.", r) + return r.payload def add_source(self, ds, filename, fp, mimetype): """Create a new Source on the given dataset and return its URL.""" From e550231b5e6feb0095f5baf3c122aaba2e0bad28 Mon Sep 17 00:00:00 2001 From: Mohammed Ali Zubair Date: Wed, 24 Apr 2024 12:03:49 -0400 Subject: [PATCH 3/6] finalize payload and response for sources, schema and metadata upload --- src/pycrunch/elements.py | 1 + src/pycrunch/importing.py | 16 +++++++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/pycrunch/elements.py b/src/pycrunch/elements.py index c4a3053..cb8d61e 100644 --- a/src/pycrunch/elements.py +++ b/src/pycrunch/elements.py @@ -283,6 +283,7 @@ class ElementResponseHandler(lemonpy.ResponseHandler): parsers = {"application/json": parse_json_element_from_response} def status_401(self, r): + print(r.json()) login_url = r.json()["urls"]["login_url"] if r.request.url == login_url: # This means that the requests was made on the login_url, so it was diff --git a/src/pycrunch/importing.py b/src/pycrunch/importing.py index 2d4a647..4cbb5d1 100644 --- a/src/pycrunch/importing.py +++ b/src/pycrunch/importing.py @@ -62,15 +62,17 @@ def add_schema_metadata(self, ds, source_url, filename, fp, mimetype, schema, me data={ "schema": schema, "metadata": metadata, + "crunchlake": "create", + "dataset_id": ds.get("self", "").split("/")[-2] + } + ) + + return shoji.Entity(ds.session, body={ + "status_code": response.status_code, + "payload": response.payload, + "source_url": response.headers.get("Location") } ) - return response - - def debug_source_payload(self, ds, source_url): - r = ds.session.get(source_url) - if r.payload is None: - raise TypeError("Response could not be parsed.", r) - return r.payload def add_source(self, ds, filename, fp, mimetype): """Create a new Source on the given dataset and return its URL.""" From 8d9e92488a5261a1bbe1f93cabc28e8bd60bca93 Mon Sep 17 00:00:00 2001 From: Mohammed Ali Zubair Date: Thu, 25 Apr 2024 07:34:52 -0400 Subject: [PATCH 4/6] ammend add_schema_metadata to only accept required params. add docstring. move test_lake from pycrunch tests to functests in zoom --- src/pycrunch/importing.py | 25 +++++++++++++++++++------ tests/test_lake.py | 20 -------------------- 2 files changed, 19 insertions(+), 26 deletions(-) delete mode 100644 tests/test_lake.py diff --git a/src/pycrunch/importing.py b/src/pycrunch/importing.py index 4cbb5d1..a119d73 100644 --- a/src/pycrunch/importing.py +++ b/src/pycrunch/importing.py @@ -53,9 +53,23 @@ def wait_for_batch_status(self, batch, status): raise ValueError("The batch did not reach the '%s' state in the " "given time. Please check again later." % status) - def add_schema_metadata(self, ds, source_url, filename, fp, mimetype, schema, metadata): - response = ds.session.post( - source_url, + def add_schema_metadata(self, site, schema, metadata, filename, fp, mimetype="application/x-parquet"): + """ + Create a new Source from a parquet file using schema and metadata. + + Parameters: + site (shoji.Catalog): a shoji Catalog object, from which we acquire session and sources url + schema (str): json string containing schema + metadata (str): json string containing metadata + filename (str): name of file being uploaded + fp (BufferedReader): opened file object + mimetype (str): mimetype of file being uploaded + + Returns: + shoji.Entity: Shoji entity containing the payload, status_code and source_url of uploaded file + """ + response = site.session.post( + site.catalogs.sources, files={ "uploaded_file": (filename, fp, mimetype) }, @@ -63,11 +77,10 @@ def add_schema_metadata(self, ds, source_url, filename, fp, mimetype, schema, me "schema": schema, "metadata": metadata, "crunchlake": "create", - "dataset_id": ds.get("self", "").split("/")[-2] + "dataset_id": "None" } ) - - return shoji.Entity(ds.session, body={ + return shoji.Entity(site.session, body={ "status_code": response.status_code, "payload": response.payload, "source_url": response.headers.get("Location") diff --git a/tests/test_lake.py b/tests/test_lake.py deleted file mode 100644 index 01424d1..0000000 --- a/tests/test_lake.py +++ /dev/null @@ -1,20 +0,0 @@ -import json -import subprocess -import time - - -class TestCrunchLakeWorkflow: - def test_ping(self): - command = ["cr.core.launch", "ping", "crunch-lake-ping", '--kwargs={"pong": "test-ping"}'] - p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - start = time.time() - stdout = p.stdout.readline() - stderr = p.stderr.readline() - - while time.time() - start < 30: - try: - assert "Workflow launched with id" in stderr.decode("utf-8") - except AssertionError as assertion_error: - if not time.time() - start < 30: - raise assertion_error - time.sleep(.01) From f7d05466bb7455a8033f87ab3a4f5b2d52e55722 Mon Sep 17 00:00:00 2001 From: Mohammed Ali Zubair Date: Thu, 25 Apr 2024 10:04:07 -0400 Subject: [PATCH 5/6] remove stray print. returning result as shoji.entity --- src/pycrunch/elements.py | 1 - src/pycrunch/importing.py | 9 ++------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/src/pycrunch/elements.py b/src/pycrunch/elements.py index cb8d61e..c4a3053 100644 --- a/src/pycrunch/elements.py +++ b/src/pycrunch/elements.py @@ -283,7 +283,6 @@ class ElementResponseHandler(lemonpy.ResponseHandler): parsers = {"application/json": parse_json_element_from_response} def status_401(self, r): - print(r.json()) login_url = r.json()["urls"]["login_url"] if r.request.url == login_url: # This means that the requests was made on the login_url, so it was diff --git a/src/pycrunch/importing.py b/src/pycrunch/importing.py index a119d73..6a9a838 100644 --- a/src/pycrunch/importing.py +++ b/src/pycrunch/importing.py @@ -66,7 +66,7 @@ def add_schema_metadata(self, site, schema, metadata, filename, fp, mimetype="ap mimetype (str): mimetype of file being uploaded Returns: - shoji.Entity: Shoji entity containing the payload, status_code and source_url of uploaded file + shoji.Entity: Shoji entity containing the source url """ response = site.session.post( site.catalogs.sources, @@ -80,12 +80,7 @@ def add_schema_metadata(self, site, schema, metadata, filename, fp, mimetype="ap "dataset_id": "None" } ) - return shoji.Entity(site.session, body={ - "status_code": response.status_code, - "payload": response.payload, - "source_url": response.headers.get("Location") - } - ) + return shoji.Entity(self=response.headers.get("Location"), session=site.session) def add_source(self, ds, filename, fp, mimetype): """Create a new Source on the given dataset and return its URL.""" From 01f2a2de7a48f38a8612b296e567151383e24399 Mon Sep 17 00:00:00 2001 From: Mohammed Ali Zubair Date: Thu, 25 Apr 2024 16:05:40 -0400 Subject: [PATCH 6/6] add json formatting --- src/pycrunch/importing.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pycrunch/importing.py b/src/pycrunch/importing.py index 6a9a838..2fdfd24 100644 --- a/src/pycrunch/importing.py +++ b/src/pycrunch/importing.py @@ -59,8 +59,8 @@ def add_schema_metadata(self, site, schema, metadata, filename, fp, mimetype="ap Parameters: site (shoji.Catalog): a shoji Catalog object, from which we acquire session and sources url - schema (str): json string containing schema - metadata (str): json string containing metadata + schema (dict): json string containing schema + metadata (dict): json string containing metadata filename (str): name of file being uploaded fp (BufferedReader): opened file object mimetype (str): mimetype of file being uploaded @@ -74,8 +74,8 @@ def add_schema_metadata(self, site, schema, metadata, filename, fp, mimetype="ap "uploaded_file": (filename, fp, mimetype) }, data={ - "schema": schema, - "metadata": metadata, + "schema": json.dumps(schema), + "metadata": json.dumps(metadata), "crunchlake": "create", "dataset_id": "None" }