From 54bc3e0245aecd362ef58d82c6e40344fa845c1e Mon Sep 17 00:00:00 2001 From: hallvictoria Date: Thu, 10 Oct 2024 13:46:23 -0500 Subject: [PATCH] add eg sourced blob functions --- .../blob_functions_stein/function_app.py | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tests/endtoend/blob_functions/blob_functions_stein/function_app.py b/tests/endtoend/blob_functions/blob_functions_stein/function_app.py index 2621a336..ea872bde 100644 --- a/tests/endtoend/blob_functions/blob_functions_stein/function_app.py +++ b/tests/endtoend/blob_functions/blob_functions_stein/function_app.py @@ -391,3 +391,55 @@ def put_get_multiple_blobs_as_bytes_return_http_response( mimetype="application/json", status_code=200 ) + + +@app.function_name(name="blob_trigger_default_source_enum") +@app.blob_trigger(arg_name="file", + path="python-worker-tests/test-blob-trigger.txt", + connection="AzureWebJobsStorage", + source=func.BlobSource.LOGS_AND_CONTAINER_SCAN) +def blob_trigger_default_source_enum(file: func.InputStream) -> str: + return json.dumps({ + 'name': file.name, + 'length': file.length, + 'content': file.read().decode('utf-8') + }) + + +@app.function_name(name="blob_trigger_eventgrid_source_enum") +@app.blob_trigger(arg_name="file", + path="python-worker-tests/test-blob-trigger.txt", + connection="AzureWebJobsStorage", + source=func.BlobSource.EVENT_GRID) +def blob_trigger_eventgrid_source_enum(file: func.InputStream) -> str: + return json.dumps({ + 'name': file.name, + 'length': file.length, + 'content': file.read().decode('utf-8') + }) + + +@app.function_name(name="blob_trigger_default_source_str") +@app.blob_trigger(arg_name="file", + path="python-worker-tests/test-blob-trigger.txt", + connection="AzureWebJobsStorage", + source="LogsAndContainerScan") +def blob_trigger_default_source_str(file: func.InputStream) -> str: + return json.dumps({ + 'name': file.name, + 'length': file.length, + 'content': file.read().decode('utf-8') + }) + + +@app.function_name(name="blob_trigger_eventgrid_source_str") +@app.blob_trigger(arg_name="file", + path="python-worker-tests/test-blob-trigger.txt", + connection="AzureWebJobsStorage", + source="EventGrid") +def blob_trigger_eventgrid_source_str(file: func.InputStream) -> str: + return json.dumps({ + 'name': file.name, + 'length': file.length, + 'content': file.read().decode('utf-8') + })