Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add method to upload sources from schema and metadata #82

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/pycrunch/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ class ElementResponseHandler(lemonpy.ResponseHandler):
parsers = {"application/json": parse_json_element_from_response}

def status_401(self, r):
print(r.json())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this print

login_url = r.json()["urls"]["login_url"]
if r.request.url == login_url:
# This means that the requests was made on the login_url, so it was
Expand Down
21 changes: 21 additions & 0 deletions src/pycrunch/importing.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,27 @@ def wait_for_batch_status(self, batch, status):
else:
raise ValueError("The batch did not reach the '%s' state in the "
"given time. Please check again later." % status)

def add_schema_metadata(self, ds, source_url, filename, fp, mimetype, schema, metadata):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need the dataset here, it's only being used for the session, this could be obtained from the connection I believe.
The source_url is not a user input either, you will get this from the main API catalog, under catalogs.sources.

This signature should be:

Suggested change
def add_schema_metadata(self, ds, source_url, filename, fp, mimetype, schema, metadata):
def add_schema_metadata(self, schema, metadata, filename, fp, mimetype):

I think that for this case, since it's going to be crunch lake, the mimetype is always known because it has to be a parquet file, we don't accept anything else do we?

Note that schema and metadata are first, because they are the main thing about the name of this function.

Additionally, add a docstring indicating what are the types of the arguments. Particularly, schema and metadata, are they dictionaries? Or are they file pointers to my local json files?

response = ds.session.post(
source_url,
files={
"uploaded_file": (filename, fp, mimetype)
},
data={
"schema": schema,
"metadata": metadata,
"crunchlake": "create",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this argument?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's crunchlake specific. @mgdotdev can expand on this.

"dataset_id": ds.get("self", "").split("/")[-2]
}
)

return shoji.Entity(ds.session, body={
"status_code": response.status_code,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These aren't the attributes of a new Source entity.

These body attributes should match the same as if you were reading a Source from the API. At this point right after the creation you won't have them. The response will probably be a 201 with a Location header.

So, the only attribute you should be able to fill form there is the self.

See how it's being done here:
https://github.com/Crunch-io/pycrunch/blob/master/src/pycrunch/shoji.py#L180

entity["self"] = URL(seeother[-1].headers["Location"], "")

The Entity you return is ok to be empty as long as it has a self url.

Then the consumer will be able to do source.refresh() to trigger a new GET request and fetch all the attributes if they want.

"payload": response.payload,
"source_url": response.headers.get("Location")
}
)

def add_source(self, ds, filename, fp, mimetype):
"""Create a new Source on the given dataset and return its URL."""
Expand Down
20 changes: 20 additions & 0 deletions tests/test_lake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import json
import subprocess
import time


class TestCrunchLakeWorkflow:
def test_ping(self):
command = ["cr.core.launch", "ping", "crunch-lake-ping", '--kwargs={"pong": "test-ping"}']
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this test?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to check if we have connection to the lake service from pycrunch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it's just a temporary test? It doesn't feel like this is the right place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Alig1493 something to notice is that this pycrunch is an open source consumer library. It does not communicate with our internals at Crunch.

The tests we implement here should be able to run stand alone alone. We can do the integration tests in other internal repositories.

start = time.time()
stdout = p.stdout.readline()
stderr = p.stderr.readline()

while time.time() - start < 30:
try:
assert "Workflow launched with id" in stderr.decode("utf-8")
except AssertionError as assertion_error:
if not time.time() - start < 30:
raise assertion_error
time.sleep(.01)
Loading