diff --git a/docs/sdk/code-reference/pipelines/destinations/python/delta.md b/docs/sdk/code-reference/pipelines/destinations/python/delta.md new file mode 100644 index 000000000..dbf0039a5 --- /dev/null +++ b/docs/sdk/code-reference/pipelines/destinations/python/delta.md @@ -0,0 +1,2 @@ +# Write to Delta +::: src.sdk.python.rtdip_sdk.pipelines.destinations.python.delta \ No newline at end of file diff --git a/docs/sdk/code-reference/pipelines/sources/python/delta.md b/docs/sdk/code-reference/pipelines/sources/python/delta.md new file mode 100644 index 000000000..21e5b2523 --- /dev/null +++ b/docs/sdk/code-reference/pipelines/sources/python/delta.md @@ -0,0 +1,2 @@ +# Read from Delta +::: src.sdk.python.rtdip_sdk.pipelines.sources.python.delta \ No newline at end of file diff --git a/docs/sdk/code-reference/pipelines/sources/python/delta_sharing.md b/docs/sdk/code-reference/pipelines/sources/python/delta_sharing.md new file mode 100644 index 000000000..0668d8a60 --- /dev/null +++ b/docs/sdk/code-reference/pipelines/sources/python/delta_sharing.md @@ -0,0 +1,2 @@ +# Read from Delta with Delta Sharing +::: src.sdk.python.rtdip_sdk.pipelines.sources.python.delta_sharing \ No newline at end of file diff --git a/docs/sdk/code-reference/pipelines/transformers/spark/raw_forecast_to_weather_data_model.md b/docs/sdk/code-reference/pipelines/transformers/spark/raw_forecast_to_weather_data_model.md new file mode 100644 index 000000000..47ede29bf --- /dev/null +++ b/docs/sdk/code-reference/pipelines/transformers/spark/raw_forecast_to_weather_data_model.md @@ -0,0 +1,2 @@ +# Convert Forecast Raw JSON data to the Weather Data Model +::: src.sdk.python.rtdip_sdk.pipelines.transformers.spark.raw_forecast_to_weather_data_model diff --git a/docs/sdk/pipelines/components.md b/docs/sdk/pipelines/components.md index 96be6188b..d6c7b5673 100644 --- a/docs/sdk/pipelines/components.md +++ b/docs/sdk/pipelines/components.md @@ -45,7 +45,6 @@ Sources are components that connect to source systems and extract data from them |[PJM Historical Load ISO](../code-reference/pipelines/sources/spark/iso/pjm_historical_load_iso.md) ||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| |[Weather Forecast API V1](../code-reference/pipelines/sources/spark/weather/weather_forecast_api_v1.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| |[Weather Forecast API V1 Multi](../code-reference/pipelines/sources/spark/weather/weather_forecast_api_v1_multi.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| - !!! note "Note" This list will dynamically change as the framework is further developed and new components are added. @@ -65,6 +64,7 @@ Transformers are components that perform transformations on data. These will tar |[Pandas to PySpark DataFrame Conversion](../code-reference/pipelines/transformers/spark/pandas_to_pyspark.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| |[PySpark to Pandas DataFrame Conversion](../code-reference/pipelines/transformers/spark/pyspark_to_pandas.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| |[MISO To Meters Data Model](../code-reference/pipelines/transformers/spark/iso/miso_to_mdm.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| +|[Raw Forecast to Weather Data Model](../code-reference/pipelines/transformers/spark/raw_forecast_to_weather_data_model.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| |[PJM To Meters Data Model](../code-reference/pipelines/transformers/spark/iso/pjm_to_mdm.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| !!! note "Note" diff --git a/environment.yml b/environment.yml index 6e0359cce..5fd85d959 100644 --- a/environment.yml +++ b/environment.yml @@ -37,7 +37,7 @@ dependencies: - azure-keyvault-secrets==4.7.0 - boto3==1.28.2 - pyodbc==4.0.39 - - fastapi==0.100.0 + - fastapi==0.100.1 - httpx==0.24.1 - trio==0.22.1 - pyspark>=3.3.0,<3.5.0 @@ -53,19 +53,23 @@ dependencies: - mkdocs-macros-plugin==1.0.1 - pygments==2.15.1 - pymdown-extensions==10.0.1 - - databricks-sql-connector==2.7.0 + - databricks-sql-connector==2.8.0 - databricks-sdk==0.2.1 - semver==3.0.0 - xlrd==2.0.1 - pygithub==1.59.0 - strawberry-graphql[fastapi,pydantic]==0.194.4 - web3==6.5.0 + - twine==4.0.2 - pip: - dependency-injector==4.41.0 - azure-functions==1.15.0 - nest_asyncio==1.5.6 - hvac==1.1.1 - - langchain==0.0.230 + - langchain==0.0.247 - deltalake==0.10.0 - moto[s3]==4.1.13 + - build==0.10.0 + - polars==0.18.8 + - delta-sharing==0.7.3 \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index 68d0b6043..e046b0df9 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -160,6 +160,9 @@ nav: - Base Weather: sdk/code-reference/pipelines/sources/spark/weather/base_weather.md - Weather Forecast API V1: sdk/code-reference/pipelines/sources/spark/weather/weather_forecast_api_v1.md - Weather Forecast API V1 Multi: sdk/code-reference/pipelines/sources/spark/weather/weather_forecast_api_v1_multi.md + - Python: + - Delta: sdk/code-reference/pipelines/sources/python/delta.md + - Delta Sharing: sdk/code-reference/pipelines/sources/python/delta_sharing.md - Transformers: - Spark: - Binary To String: sdk/code-reference/pipelines/transformers/spark/binary_to_string.md @@ -174,6 +177,9 @@ nav: - ISO: - MISO To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/miso_to_mdm.md - PJM To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/pjm_to_mdm.md + - Weather: + - Raw Forecast To Weather Data Model: sdk/code-reference/pipelines/transformers/spark/raw_forecast_to_weather_data_model.md + - Destinations: - Spark: - Delta: sdk/code-reference/pipelines/destinations/spark/delta.md @@ -183,6 +189,8 @@ nav: - Kinesis: sdk/code-reference/pipelines/destinations/spark/kinesis.md - Rest API: sdk/code-reference/pipelines/destinations/spark/rest_api.md - Process Control Data Model To Delta: sdk/code-reference/pipelines/destinations/spark/pcdm_to_delta.md + - Python: + - Delta: sdk/code-reference/pipelines/destinations/python/delta.md - Blockchain: - EVM: sdk/code-reference/pipelines/destinations/blockchain/evm.md diff --git a/setup.py b/setup.py index 36511b136..1f6e068b2 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ long_description = (here / "PYPI-README.md").read_text() INSTALL_REQUIRES = [ - "databricks-sql-connector==2.7.0", + "databricks-sql-connector==2.8.0", "azure-identity==1.12.0", "pyodbc==4.0.39", "pandas==1.5.2", @@ -40,7 +40,7 @@ "grpcio>=1.48.1", "grpcio-status>=1.48.1", "googleapis-common-protos>=1.56.4", - "langchain==0.0.230", + "langchain==0.0.247", "openai==0.27.8" ] diff --git a/src/api/FastAPIApp/__init__.py b/src/api/FastAPIApp/__init__.py index 677958b93..c7edaa35e 100644 --- a/src/api/FastAPIApp/__init__.py +++ b/src/api/FastAPIApp/__init__.py @@ -67,6 +67,7 @@ openapi_url="/api/openapi.json", docs_url=None, redoc_url=None, + license_info={"name": "Apache License 2.0", "identifier": "Apache-2.0"} ) app.add_middleware(GZipMiddleware, minimum_size=1000) diff --git a/src/api/README.md b/src/api/README.md index eed2e6ca9..4ff8d7683 100644 --- a/src/api/README.md +++ b/src/api/README.md @@ -35,6 +35,13 @@ Ensure that you setup the **local.settings.json** file with the relevant paramet |DATABRICKS_SQL_SERVER_HOSTNAME|adb-xxxxx.x.azuredatabricks.net| |DATABRICKS_SQL_HTTP_PATH|/sql/1.0/warehouses/xxx| +Please also ensure to install all the turbodbc requirements for your machine by reviewing the [installation instructions](https://turbodbc.readthedocs.io/en/latest/pages/getting_started.html) of turbodbc. On a macbook, this includes executing the following commands: + +```bash +brew install llvm +brew install boost +``` + ### Swagger and Redoc Fast API provides endpoints for Swagger and Redoc pages. Ensure that you review these pages after any updates to confirm they are working as expected. diff --git a/src/api/requirements.txt b/src/api/requirements.txt index b86bc0fab..cbcfd611d 100644 --- a/src/api/requirements.txt +++ b/src/api/requirements.txt @@ -1,12 +1,12 @@ # Do not include azure-functions-worker as it may conflict with the Azure Functions platform azure-functions==1.15.0 -fastapi==0.100.0 +fastapi==0.100.1 nest_asyncio==1.5.6 strawberry-graphql[fastapi,pydantic]==0.194.4 turbodbc==4.5.10 pyodbc==4.0.39 importlib_metadata>=1.0.0 -databricks-sql-connector==2.7.0 +databricks-sql-connector==2.8.0 azure-identity==1.12.0 oauthlib>=3.2.2 pandas==1.5.2 diff --git a/src/api/v1/graphql.py b/src/api/v1/graphql.py index abe7b961c..0327b1798 100644 --- a/src/api/v1/graphql.py +++ b/src/api/v1/graphql.py @@ -40,8 +40,8 @@ async def raw_get( data_type: str = Query(..., description="Data Type", examples={"float": {"value": "float"}, "integer": {"value": "integer"}, "string": {"value": "string"}}), tag_name: List[str] = Query(..., description="Tag Name"), include_bad_data: bool = Query(True, description="Include or remove Bad data points"), - start_date: date = Query(..., description="Start Date", example="2022-01-01"), - end_date: date = Query(..., description="End Date", example="2022-01-02"), + start_date: date = Query(..., description="Start Date", examples="2022-01-01"), + end_date: date = Query(..., description="End Date", examples="2022-01-02"), authorization: str = Header(None, include_in_schema=False), # authorization: str = Depends(oauth2_scheme) ) -> RawResponseQL: diff --git a/src/api/v1/interpolate.py b/src/api/v1/interpolate.py index 30a102182..045f449f5 100644 --- a/src/api/v1/interpolate.py +++ b/src/api/v1/interpolate.py @@ -43,7 +43,7 @@ def interpolate_events_get(base_query_parameters, raw_query_parameters, tag_quer get_description = """ ## Interpolate -Interpolation of raw timeseries data. Refer to the following [documentation](https://www.rtdip.io/sdk/code-reference/query/interpolate/) for further information. +Interpolation of raw timeseries data. """ @api_v1_router.get( @@ -51,7 +51,8 @@ def interpolate_events_get(base_query_parameters, raw_query_parameters, tag_quer name="Interpolate GET", description=get_description, tags=["Events"], - responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}} + responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}}, + openapi_extra={"externalDocs": {"description": "RTDIP Interpolation Query Documentation", "url": "https://www.rtdip.io/sdk/code-reference/query/interpolate/"}} ) async def interpolate_get( base_query_parameters: BaseQueryParams = Depends(), @@ -65,7 +66,7 @@ async def interpolate_get( post_description = """ ## Interpolate -Interpolation of raw timeseries data via a POST method to enable providing a list of tag names that can exceed url length restrictions via GET Query Parameters. Refer to the following [documentation](https://www.rtdip.io/sdk/code-reference/query/interpolate/) for further information. +Interpolation of raw timeseries data via a POST method to enable providing a list of tag names that can exceed url length restrictions via GET Query Parameters. """ @api_v1_router.post( @@ -73,7 +74,8 @@ async def interpolate_get( name="Interpolate POST", description=get_description, tags=["Events"], - responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}} + responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}}, + openapi_extra={"externalDocs": {"description": "RTDIP Interpolation Query Documentation", "url": "https://www.rtdip.io/sdk/code-reference/query/interpolate/"}} ) async def interpolate_post( base_query_parameters: BaseQueryParams = Depends(), diff --git a/src/api/v1/interpolation_at_time.py b/src/api/v1/interpolation_at_time.py index 03e345ade..762901bb2 100644 --- a/src/api/v1/interpolation_at_time.py +++ b/src/api/v1/interpolation_at_time.py @@ -41,7 +41,7 @@ def interpolation_at_time_events_get(base_query_parameters, tag_query_parameters get_description = """ ## Interpolation at Time -Interpolation at Time of raw timeseries data. Refer to the following [documentation](https://www.rtdip.io/sdk/code-reference/query/interpolation_at_time/) for further information. +Interpolation at Time of raw timeseries data. """ @api_v1_router.get( @@ -49,7 +49,8 @@ def interpolation_at_time_events_get(base_query_parameters, tag_query_parameters name="Interpolation at Time GET", description=get_description, tags=["Events"], - responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}} + responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}}, + openapi_extra={"externalDocs": {"description": "RTDIP Interpolation At Time Query Documentation", "url": "https://www.rtdip.io/sdk/code-reference/query/interpolate_at_time/"}} ) async def interpolate_get( base_query_parameters: BaseQueryParams = Depends(), @@ -61,7 +62,7 @@ async def interpolate_get( post_description = """ ## Interpolation at Time -Interpolation at time of raw timeseries data via a POST method to enable providing a list of tag names that can exceed url length restrictions via GET Query Parameters. Refer to the following [documentation](https://www.rtdip.io/sdk/code-reference/query/interpolation_at_time/) for further information. +Interpolation at time of raw timeseries data via a POST method to enable providing a list of tag names that can exceed url length restrictions via GET Query Parameters. """ @api_v1_router.post( @@ -69,7 +70,8 @@ async def interpolate_get( name="Interpolation at Time POST", description=get_description, tags=["Events"], - responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}} + responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}}, + openapi_extra={"externalDocs": {"description": "RTDIP Interpolation At Time Query Documentation", "url": "https://www.rtdip.io/sdk/code-reference/query/interpolate_at_time/"}} ) async def interpolate_post( base_query_parameters: BaseQueryParams = Depends(), diff --git a/src/api/v1/metadata.py b/src/api/v1/metadata.py index 60ad6b137..e27c068e8 100644 --- a/src/api/v1/metadata.py +++ b/src/api/v1/metadata.py @@ -39,7 +39,7 @@ def metadata_retrieval_get(query_parameters, metadata_query_parameters): get_description = """ ## Metadata -Retrieval of metadata, including UoM, Description and any other possible fields, if available. Refer to the following [documentation](https://www.rtdip.io/sdk/code-reference/query/metadata/) for further information. +Retrieval of metadata, including UoM, Description and any other possible fields, if available. """ @api_v1_router.get( @@ -48,7 +48,8 @@ def metadata_retrieval_get(query_parameters, metadata_query_parameters): description=get_description, tags=["Metadata"], dependencies=[Depends(oauth2_scheme)], - responses={200: {"model": MetadataResponse}, 400: {"model": HTTPError}} + responses={200: {"model": MetadataResponse}, 400: {"model": HTTPError}}, + openapi_extra={"externalDocs": {"description": "RTDIP Metadata Query Documentation", "url": "https://www.rtdip.io/sdk/code-reference/query/metadata/"}} ) async def metadata_get(query_parameters: BaseQueryParams = Depends(), metadata_query_parameters: MetadataQueryParams = Depends()): return metadata_retrieval_get(query_parameters, metadata_query_parameters) @@ -56,7 +57,7 @@ async def metadata_get(query_parameters: BaseQueryParams = Depends(), metadata_q post_description = """ ## Metadata -Retrieval of metadata, including UoM, Description and any other possible fields, if available via a POST method to enable providing a list of tag names that can exceed url length restrictions via GET Query Parameters. Refer to the following [documentation](https://www.rtdip.io/sdk/code-reference/query/metadata/) for further information. +Retrieval of metadata, including UoM, Description and any other possible fields, if available via a POST method to enable providing a list of tag names that can exceed url length restrictions via GET Query Parameters. """ @api_v1_router.post( @@ -65,7 +66,8 @@ async def metadata_get(query_parameters: BaseQueryParams = Depends(), metadata_q description=post_description, tags=["Metadata"], dependencies=[Depends(oauth2_scheme)], - responses={200: {"model": MetadataResponse}, 400: {"model": HTTPError}} + responses={200: {"model": MetadataResponse}, 400: {"model": HTTPError}}, + openapi_extra={"externalDocs": {"description": "RTDIP Metadata Query Documentation", "url": "https://www.rtdip.io/sdk/code-reference/query/metadata/"}} ) async def metadata_post(query_parameters: BaseQueryParams = Depends(), metadata_query_parameters: TagsBodyParams = Body(default=...)): return metadata_retrieval_get(query_parameters, metadata_query_parameters) \ No newline at end of file diff --git a/src/api/v1/models.py b/src/api/v1/models.py index 33a3ee5f6..ac68e2e73 100644 --- a/src/api/v1/models.py +++ b/src/api/v1/models.py @@ -113,10 +113,10 @@ def __init__( class RawQueryParams: def __init__( self, - data_type: str = Query(..., description="Data Type can be one of the following options:[float, double, integer, string]"), + data_type: str = Query(..., description="Data Type can be one of the following options: float, double, integer, string", examples=["float", "double", "integer", "string"]), include_bad_data: bool = Query(..., description="Include or remove Bad data points"), - start_date: Union[date, datetime] = Query(..., description="Start Date in format YYYY-MM-DD or YYYY-MM-DDTHH:mm:ss or YYYY-MM-DDTHH:mm:ss+zz:zz", examples={"2022-01-01": {"value": "2022-01-01"}, "2022-01-01T15:00:00": {"value": "2022-01-01T15:00:00"}, "2022-01-01T15:00:00+00:00": {"value": "2022-01-01T15:00:00+00:00"}}), - end_date: Union[date, datetime] = Query(..., description="End Date in format YYYY-MM-DD or YYYY-MM-DDTHH:mm:ss or YYYY-MM-DDTHH:mm:ss+zz:zz", examples={"2022-01-02": {"value": "2022-01-02"}, "2022-01-01T16:00:00": {"value": "2022-01-01T16:00:00"}, "2022-01-01T15:00:00+00:00": {"value": "2022-01-01T15:00:00+00:00"}}), + start_date: Union[date, datetime] = Query(..., description="Start Date in format YYYY-MM-DD or YYYY-MM-DDTHH:mm:ss or YYYY-MM-DDTHH:mm:ss+zz:zz", examples=["2022-01-01", "2022-01-01T15:00:00", "2022-01-01T15:00:00+00:00"]), + end_date: Union[date, datetime] = Query(..., description="End Date in format YYYY-MM-DD or YYYY-MM-DDTHH:mm:ss or YYYY-MM-DDTHH:mm:ss+zz:zz", examples=["2022-01-02", "2022-01-01T16:00:00", "2022-01-01T15:00:00+00:00"]), ): self.data_type = data_type self.include_bad_data = include_bad_data @@ -136,11 +136,11 @@ class TagsBodyParams(BaseModel): class ResampleQueryParams: def __init__( self, - sample_rate: str = Query(..., description="sample_rate is deprecated and will be removed in v1.0.0. Please use time_interval_rate instead.", example=5, deprecated=True), - sample_unit: str = Query(..., description="sample_unit is deprecated and will be removed in v1.0.0. Please use time_interval_unit instead.", examples={"second": {"value": "second"}, "minute": {"value": "minute"}, "hour": {"value": "hour"}, "day": {"value": "day"}}, deprecated=True), - time_interval_rate: str = Query(..., description="Time Interval Rate as a numeric input", example=5), - time_interval_unit: str = Query(..., description="Time Interval Unit can be one of the options: [second, minute, day, hour]", examples={"second": {"value": "second"}, "minute": {"value": "minute"}, "hour": {"value": "hour"}, "day": {"value": "day"}}), - agg_method: str = Query(..., description="Aggregation Method can be one of the following [first, last, avg, min, max]", examples={"first": {"value": "first"}, "last": {"value": "last"}, "avg": {"value": "avg"}, "min": {"value": "min"}, "max": {"value": "max"}}), + sample_rate: str = Query(..., description="sample_rate is deprecated and will be removed in v1.0.0. Please use time_interval_rate instead.", examples=[5], deprecated=True), + sample_unit: str = Query(..., description="sample_unit is deprecated and will be removed in v1.0.0. Please use time_interval_unit instead.", examples=["second", "minute", "hour", "day"], deprecated=True), + time_interval_rate: str = Query(..., description="Time Interval Rate as a numeric input", examples=[5]), + time_interval_unit: str = Query(..., description="Time Interval Unit can be one of the options: [second, minute, day, hour]", examples=["second", "minute", "hour", "day"]), + agg_method: str = Query(..., description="Aggregation Method can be one of the following [first, last, avg, min, max]", examples=["first", "last", "avg", "min", "max"]), ): self.sample_rate = sample_rate self.sample_unit = sample_unit @@ -151,7 +151,7 @@ def __init__( class InterpolateQueryParams: def __init__( self, - interpolation_method: str = Query(..., description="Interpolation Method can be forward_fill or backward_fill", examples={"forward_fill": {"value": "forward_fill"}, "backward_fill": {"value": "backward_fill"}}), + interpolation_method: str = Query(..., description="Interpolation Method can be forward_fill or backward_fill", examples=["forward_fill", "backward_fill"]), ): self.interpolation_method = interpolation_method @@ -159,8 +159,8 @@ class InterpolationAtTimeQueryParams: def __init__( self, data_type: str = Query(..., description="Data Type can be one of the following options:[float, double, integer, string]"), - timestamps: List[Union[date, datetime]] = Query(..., description="Timestamps in format YYYY-MM-DD or YYYY-MM-DDTHH:mm:ss or YYYY-MM-DDTHH:mm:ss+zz:zz", examples={"2022-01-01": {"value": "2022-01-01"}, "2022-01-01T15:00:00": {"value": "2022-01-01T15:00:00"}, "2022-01-01T15:00:00+00:00": {"value": "2022-01-01T15:00:00+00:00"}}), - window_length: int = Query(..., description="Window Length in days", example=1), + timestamps: List[Union[date, datetime]] = Query(..., description="Timestamps in format YYYY-MM-DD or YYYY-MM-DDTHH:mm:ss or YYYY-MM-DDTHH:mm:ss+zz:zz", examples=["2022-01-01", "2022-01-01T15:00:00", "2022-01-01T15:00:00+00:00"]), + window_length: int = Query(..., description="Window Length in days", examples=[1]), include_bad_data: bool = Query(..., description="Include or remove Bad data points"), ): self.data_type = data_type @@ -171,11 +171,11 @@ def __init__( class TimeWeightedAverageQueryParams: def __init__( self, - window_size_mins: int = Query(..., description="window_size_mins is deprecated and will be removed in v1.0.0. Please use time_interval_rate and time_interval_unit instead.", example=20, deprecated=True), - time_interval_rate: str = Query(..., description="Time Interval Rate as a numeric input", example=5), - time_interval_unit: str = Query(..., description="Time Interval Unit can be one of the options: [second, minute, day, hour]", examples={"second": {"value": "second"}, "minute": {"value": "minute"}, "hour": {"value": "hour"}, "day": {"value": "day"}}), - window_length: int = Query(..., description="Window Length in days", example=1), - step: str = Query(..., description="Step can be true or false", examples={"true": {"value": "true"}, "false": {"value": "false"}}) + window_size_mins: int = Query(..., description="window_size_mins is deprecated and will be removed in v1.0.0. Please use time_interval_rate and time_interval_unit instead.", examples=[20], deprecated=True), + time_interval_rate: str = Query(..., description="Time Interval Rate as a numeric input", examples=[5]), + time_interval_unit: str = Query(..., description="Time Interval Unit can be one of the options: [second, minute, day, hour]", examples=["second", "minute", "hour", "day"]), + window_length: int = Query(..., description="Window Length in days", examples=[1]), + step: str = Query(..., description="Step can be \"true\", \"false\" or \"metadata\". \"metadata\" will retrieve the step value from the metadata table.", examples=["true", "false", "metadata"]) ): self.window_size_mins = window_size_mins self.time_interval_rate = time_interval_rate diff --git a/src/api/v1/raw.py b/src/api/v1/raw.py index 58f2b00d3..a70606796 100644 --- a/src/api/v1/raw.py +++ b/src/api/v1/raw.py @@ -39,7 +39,7 @@ def raw_events_get(base_query_parameters, raw_query_parameters, tag_query_parame get_description = """ ## Raw -Retrieval of raw timeseries data. Refer to the following [documentation](https://www.rtdip.io/sdk/code-reference/query/raw/) for further information. +Retrieval of raw timeseries data. """ @api_v1_router.get( @@ -48,7 +48,8 @@ def raw_events_get(base_query_parameters, raw_query_parameters, tag_query_parame description=get_description, tags=["Events"], dependencies=[Depends(oauth2_scheme)], - responses={200: {"model": RawResponse}, 400: {"model": HTTPError}} + responses={200: {"model": RawResponse}, 400: {"model": HTTPError}}, + openapi_extra={"externalDocs": {"description": "RTDIP Raw Query Documentation", "url": "https://www.rtdip.io/sdk/code-reference/query/raw/"}} ) async def raw_get( base_query_parameters: BaseQueryParams = Depends(), @@ -61,7 +62,7 @@ async def raw_get( post_description = """ ## Raw -Retrieval of raw timeseries data via a POST method to enable providing a list of tag names that can exceed url length restrictions via GET Query Parameters. Refer to the following [documentation](https://www.rtdip.io/sdk/code-reference/query/raw/) for further information. +Retrieval of raw timeseries data via a POST method to enable providing a list of tag names that can exceed url length restrictions via GET Query Parameters. """ @api_v1_router.post( @@ -70,7 +71,8 @@ async def raw_get( description=post_description, tags=["Events"], dependencies=[Depends(oauth2_scheme)], - responses={200: {"model": RawResponse}, 400: {"model": HTTPError}} + responses={200: {"model": RawResponse}, 400: {"model": HTTPError}}, + openapi_extra={"externalDocs": {"description": "RTDIP Raw Query Documentation", "url": "https://www.rtdip.io/sdk/code-reference/query/raw/"}} ) async def raw_post( base_query_parameters: BaseQueryParams = Depends(), diff --git a/src/api/v1/resample.py b/src/api/v1/resample.py index c365746ea..6df27a1c8 100644 --- a/src/api/v1/resample.py +++ b/src/api/v1/resample.py @@ -39,7 +39,7 @@ def resample_events_get(base_query_parameters, raw_query_parameters, tag_query_p get_description = """ ## Resample -Resampling of raw timeseries data. Refer to the following [documentation](https://www.rtdip.io/sdk/code-reference/query/resample/) for further information. +Resampling of raw timeseries data. """ @api_v1_router.get( @@ -47,7 +47,8 @@ def resample_events_get(base_query_parameters, raw_query_parameters, tag_query_p name="Resample GET", description=get_description, tags=["Events"], - responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}} + responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}}, + openapi_extra={"externalDocs": {"description": "RTDIP Resample Query Documentation", "url": "https://www.rtdip.io/sdk/code-reference/query/resample/"}} ) async def resample_get( base_query_parameters: BaseQueryParams = Depends(), @@ -60,7 +61,7 @@ async def resample_get( post_description = """ ## Resample -Resampling of raw timeseries data via a POST method to enable providing a list of tag names that can exceed url length restrictions via GET Query Parameters. Refer to the following [documentation](https://www.rtdip.io/sdk/code-reference/query/resample/) for further information. +Resampling of raw timeseries data via a POST method to enable providing a list of tag names that can exceed url length restrictions via GET Query Parameters. """ @api_v1_router.post( @@ -68,7 +69,8 @@ async def resample_get( name="Resample POST", description=post_description, tags=["Events"], - responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}} + responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}}, + openapi_extra={"externalDocs": {"description": "RTDIP Resample Query Documentation", "url": "https://www.rtdip.io/sdk/code-reference/query/resample/"}} ) async def resample_post( base_query_parameters: BaseQueryParams = Depends(), diff --git a/src/api/v1/time_weighted_average.py b/src/api/v1/time_weighted_average.py index 40ab9f89d..151a60a37 100644 --- a/src/api/v1/time_weighted_average.py +++ b/src/api/v1/time_weighted_average.py @@ -1,3 +1,17 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging import numpy as np from src.api.FastAPIApp import api_v1_router @@ -29,7 +43,7 @@ def time_weighted_average_events_get(base_query_parameters, raw_query_parameters get_description = """ ## Time Weighted Average -Time weighted average of raw timeseries data. Refer to the following [documentation](https://www.rtdip.io/sdk/code-reference/query/time-weighted-average/) for further information. +Time weighted average of raw timeseries data. """ @api_v1_router.get( @@ -37,7 +51,8 @@ def time_weighted_average_events_get(base_query_parameters, raw_query_parameters name="Time Weighted Average GET", description=get_description, tags=["Events"], - responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}} + responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}}, + openapi_extra={"externalDocs": {"description": "RTDIP Time Weighted Averages Query Documentation", "url": "https://www.rtdip.io/sdk/code-reference/query/time-weighted-average/"}} ) async def time_weighted_average_get( base_query_parameters: BaseQueryParams = Depends(), @@ -50,7 +65,7 @@ async def time_weighted_average_get( post_description = """ ## Time Weighted Average -Time weighted average of raw timeseries data via a POST method to enable providing a list of tag names that can exceed url length restrictions via GET Query Parameters. Refer to the following [documentation](https://www.rtdip.io/sdk/code-reference/query/time-weighted-average/) for further information. +Time weighted average of raw timeseries data via a POST method to enable providing a list of tag names that can exceed url length restrictions via GET Query Parameters. """ @api_v1_router.post( @@ -58,7 +73,8 @@ async def time_weighted_average_get( name="Time Weighted Average POST", description=get_description, tags=["Events"], - responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}} + responses={200: {"model": ResampleInterpolateResponse}, 400: {"model": HTTPError}}, + openapi_extra={"externalDocs": {"description": "RTDIP Time Weighted Averages Query Documentation", "url": "https://www.rtdip.io/sdk/code-reference/query/time-weighted-average/"}} ) async def time_weighted_average_post( base_query_parameters: BaseQueryParams = Depends(), diff --git a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/amqp.py b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/amqp.py new file mode 100644 index 000000000..c49734e76 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/amqp.py @@ -0,0 +1,298 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import struct +import uuid +from typing import cast, List, Callable +from datetime import datetime +from pyspark.sql.functions import udf +from pyspark.sql.types import MapType, StringType + +SYSTEM_PROPERTIES = { + "x-opt-sequence-number": b'\x52', + "x-opt-offset": b"\xa1", + "x-opt-partition-key": b"\xa1", + "x-opt-enqueued-time": b"\x83", + "message-id": b"\xa1", + "user-id": b"\xa1", + "to": b"\xa1", + "subject": b"\xa1", + "reply-to": b"\xa1", + "correlation-id": b"\xa1", + "content-type": b"\xa1", + "content-encoding": b"\xa1", + "absolute-expiry-time": b"\x83", + "creation-time": b"\x83", + "group-id": b"\xa1", + "group-sequence": b"\xa1", + "reply-to-group-id": b"\xa1", +} + +c_unsigned_char = struct.Struct('>B') +c_signed_char = struct.Struct('>b') +c_unsigned_short = struct.Struct('>H') +c_signed_short = struct.Struct('>h') +c_unsigned_int = struct.Struct('>I') +c_signed_int = struct.Struct('>i') +c_unsigned_long = struct.Struct('>L') +c_unsigned_long_long = struct.Struct('>Q') +c_signed_long_long = struct.Struct('>q') +c_float = struct.Struct('>f') +c_double = struct.Struct('>d') + +def _decode_null(buffer): + return buffer, None + + +def _decode_true(buffer): + return buffer, True + + +def _decode_false(buffer): + return buffer, False + + +def _decode_zero(buffer): + return buffer, 0 + + +def _decode_empty(buffer): + return buffer, [] + + +def _decode_boolean(buffer): + return buffer[1:], buffer[:1] == b'\x01' + + +def _decode_ubyte(buffer): + return buffer[1:], buffer[0] + + +def _decode_ushort(buffer): + return buffer[2:], c_unsigned_short.unpack(buffer[:2])[0] + + +def _decode_uint_small(buffer): + return buffer[1:], buffer[0] + + +def _decode_uint_large(buffer): + return buffer[4:], c_unsigned_int.unpack(buffer[:4])[0] + + +def _decode_ulong_small(buffer): + return buffer[1:], buffer[0] + + +def _decode_ulong_large(buffer): + return buffer[8:], c_unsigned_long_long.unpack(buffer[:8])[0] + + +def _decode_byte(buffer): + return buffer[1:], c_signed_char.unpack(buffer[:1])[0] + + +def _decode_short(buffer): + return buffer[2:], c_signed_short.unpack(buffer[:2])[0] + + +def _decode_int_small(buffer): + return buffer[1:], c_signed_char.unpack(buffer[:1])[0] + + +def _decode_int_large(buffer): + return buffer[4:], c_signed_int.unpack(buffer[:4])[0] + + +def _decode_long_small(buffer): + return buffer[1:], c_signed_char.unpack(buffer[:1])[0] + + +def _decode_long_large(buffer): + return buffer[8:], c_signed_long_long.unpack(buffer[:8])[0] + + +def _decode_float(buffer): + return buffer[4:], c_float.unpack(buffer[:4])[0] + + +def _decode_double(buffer): + return buffer[8:], c_double.unpack(buffer[:8])[0] + + +def _decode_timestamp(buffer): + return buffer[8:], c_signed_long_long.unpack(buffer[:8])[0] + + +def _decode_uuid(buffer): + return buffer[16:], uuid.UUID(bytes=buffer[:16].tobytes()) + + +def _decode_binary_small(buffer): + length_index = buffer[0] + 1 + return buffer[length_index:], buffer[1:length_index].tobytes() + + +def _decode_binary_large(buffer): + length_index = c_unsigned_long.unpack(buffer[:4])[0] + 4 + return buffer[length_index:], buffer[4:length_index].tobytes() + + +def _decode_list_small(buffer): + count = buffer[1] + buffer = buffer[2:] + values = [None] * count + for i in range(count): + buffer, values[i] = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + return buffer, values + + +def _decode_list_large(buffer): + count = c_unsigned_long.unpack(buffer[4:8])[0] + buffer = buffer[8:] + values = [None] * count + for i in range(count): + buffer, values[i] = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + return buffer, values + + +def _decode_map_small(buffer): + count = int(buffer[1]/2) + buffer = buffer[2:] + values = {} + for _ in range(count): + buffer, key = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + buffer, value = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + values[key] = value + return buffer, values + + +def _decode_map_large(buffer): + count = int(c_unsigned_long.unpack(buffer[4:8])[0]/2) + buffer = buffer[8:] + values = {} + for _ in range(count): + buffer, key = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + buffer, value = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + values[key] = value + return buffer, values + + +def _decode_array_small(buffer): + count = buffer[1] # Ignore first byte (size) and just rely on count + if count: + subconstructor = buffer[2] + buffer = buffer[3:] + values = [None] * count + for i in range(count): + buffer, values[i] = _DECODE_BY_CONSTRUCTOR[subconstructor](buffer) + return buffer, values + return buffer[2:], [] + + +def _decode_array_large(buffer): + count = c_unsigned_long.unpack(buffer[4:8])[0] + if count: + subconstructor = buffer[8] + buffer = buffer[9:] + values = [None] * count + for i in range(count): + buffer, values[i] = _DECODE_BY_CONSTRUCTOR[subconstructor](buffer) + return buffer, values + return buffer[8:], [] + +_COMPOSITES = { + 35: 'received', + 36: 'accepted', + 37: 'rejected', + 38: 'released', + 39: 'modified', +} + +def _decode_described(buffer): + composite_type = buffer[0] + buffer, descriptor = _DECODE_BY_CONSTRUCTOR[composite_type](buffer[1:]) + buffer, value = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + try: + composite_type = cast(int, _COMPOSITES[descriptor]) + return buffer, {composite_type: value} + except KeyError: + return buffer, value + +_DECODE_BY_CONSTRUCTOR: List[Callable] = cast(List[Callable], [None] * 256) +_DECODE_BY_CONSTRUCTOR[0] = _decode_described +_DECODE_BY_CONSTRUCTOR[64] = _decode_null +_DECODE_BY_CONSTRUCTOR[65] = _decode_true +_DECODE_BY_CONSTRUCTOR[66] = _decode_false +_DECODE_BY_CONSTRUCTOR[67] = _decode_zero +_DECODE_BY_CONSTRUCTOR[68] = _decode_zero +_DECODE_BY_CONSTRUCTOR[69] = _decode_empty +_DECODE_BY_CONSTRUCTOR[80] = _decode_ubyte +_DECODE_BY_CONSTRUCTOR[81] = _decode_byte +_DECODE_BY_CONSTRUCTOR[82] = _decode_uint_small +_DECODE_BY_CONSTRUCTOR[83] = _decode_ulong_small +_DECODE_BY_CONSTRUCTOR[84] = _decode_int_small +_DECODE_BY_CONSTRUCTOR[85] = _decode_long_small +_DECODE_BY_CONSTRUCTOR[86] = _decode_boolean +_DECODE_BY_CONSTRUCTOR[96] = _decode_ushort +_DECODE_BY_CONSTRUCTOR[97] = _decode_short +_DECODE_BY_CONSTRUCTOR[112] = _decode_uint_large +_DECODE_BY_CONSTRUCTOR[113] = _decode_int_large +_DECODE_BY_CONSTRUCTOR[114] = _decode_float +_DECODE_BY_CONSTRUCTOR[128] = _decode_ulong_large +_DECODE_BY_CONSTRUCTOR[129] = _decode_long_large +_DECODE_BY_CONSTRUCTOR[130] = _decode_double +_DECODE_BY_CONSTRUCTOR[131] = _decode_timestamp +_DECODE_BY_CONSTRUCTOR[152] = _decode_uuid +_DECODE_BY_CONSTRUCTOR[160] = _decode_binary_small +_DECODE_BY_CONSTRUCTOR[161] = _decode_binary_small +_DECODE_BY_CONSTRUCTOR[163] = _decode_binary_small +_DECODE_BY_CONSTRUCTOR[176] = _decode_binary_large +_DECODE_BY_CONSTRUCTOR[177] = _decode_binary_large +_DECODE_BY_CONSTRUCTOR[179] = _decode_binary_large +_DECODE_BY_CONSTRUCTOR[192] = _decode_list_small +_DECODE_BY_CONSTRUCTOR[193] = _decode_map_small +_DECODE_BY_CONSTRUCTOR[208] = _decode_list_large +_DECODE_BY_CONSTRUCTOR[209] = _decode_map_large +_DECODE_BY_CONSTRUCTOR[224] = _decode_array_small +_DECODE_BY_CONSTRUCTOR[240] = _decode_array_large + + +def _decode_to_string(decoder_value, value): + if decoder_value == b"\x83": + return datetime.fromtimestamp(int(value)/1000).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + elif type(value) is bytes or type(value) is bytearray: + return value.decode("utf-8") + else: + return str(value) + +@udf(returnType=MapType(StringType(), StringType())) +def decode_kafka_headers_to_amqp_properties(headers: dict) -> dict: + if headers is None or len(headers) == 0 or type(headers) is not dict: + return {} + else: + properties = {} + for key, value in headers.items(): + try: + if key in SYSTEM_PROPERTIES: + properties[key] = _decode_to_string(SYSTEM_PROPERTIES[key], value) + else: + decoder_value = value[0:1] + buffer_val = memoryview(value) + buffer_val, decoded_value = _DECODE_BY_CONSTRUCTOR[buffer_val[0]](buffer_val[1:]) + properties[key] = _decode_to_string(decoder_value, decoded_value) + except Exception as e: + print(f"Error decoding header {key}: {e}") + properties[key] = _decode_to_string(None, value) + return properties diff --git a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/weather.py b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/weather.py old mode 100644 new mode 100755 index 7678d1864..19a3ebdec --- a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/weather.py +++ b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/weather.py @@ -4,57 +4,84 @@ # See the License for the specific language governing permissions and # limitations under the License. -from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType +from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, TimestampType WEATHER_FORECAST_SCHEMA = StructType( [ - StructField("CLASS", StringType(), True), - StructField("CLDS", IntegerType(), True), - StructField("DAY_IND", StringType(), True), - StructField("DEWPT", IntegerType(), True), - StructField("DOW", StringType(), True), - StructField("EXPIRE_TIME_GMT", IntegerType(), True), - StructField("FCST_VALID", IntegerType(), True), - StructField("FCST_VALID_LOCAL", StringType(), True), - StructField("FEELS_LIKE", IntegerType(), True), - StructField("GOLF_CATEGORY", StringType(), True), - StructField("GOLF_INDEX", DoubleType(), True), - StructField("GUST", DoubleType(), True), - StructField("HI", IntegerType(), True), - StructField("ICON_CODE", IntegerType(), True), - StructField("ICON_EXTD", IntegerType(), True), - StructField("MSLP", DoubleType(), True), - StructField("NUM", IntegerType(), True), - StructField("PHRASE_12CHAR", StringType(), True), - StructField("PHRASE_22CHAR", StringType(), True), - StructField("PHRASE_32CHAR", StringType(), True), - StructField("POP", StringType(), True), - StructField("PRECIP_TYPE", StringType(), True), - StructField("QPF", DoubleType(), True), - StructField("RH", IntegerType(), True), - StructField("SEVERITY", IntegerType(), True), - StructField("SNOW_QPF", DoubleType(), True), - StructField("SUBPHRASE_PT1", StringType(), True), - StructField("SUBPHRASE_PT2", StringType(), True), - StructField("SUBPHRASE_PT3", StringType(), True), - StructField("TEMP", IntegerType(), True), - StructField("UV_DESC", StringType(), True), - StructField("UV_INDEX", IntegerType(), True), - StructField("UV_INDEX_RAW", DoubleType(), True), - StructField("UV_WARNING", IntegerType(), True), - StructField("VIS", DoubleType(), True), - StructField("WC", IntegerType(), True), - StructField("WDIR", IntegerType(), True), - StructField("WDIR_CARDINAL", StringType(), True), - StructField("WSPD", IntegerType(), True), - StructField("WXMAN", StringType(), True), + StructField("Latitude", DoubleType(), True), + StructField("Longitude", DoubleType(), True), + StructField("Class", StringType(), True), + StructField("ExpireTimeGmt", IntegerType(), True), + StructField("FcstValid", IntegerType(), True), + StructField("FcstValidLocal", StringType(), True), + StructField("Num", IntegerType(), True), + StructField("DayInd", StringType(), True), + StructField("Temp", IntegerType(), True), + StructField("Dewpt", IntegerType(), True), + StructField("Hi", IntegerType(), True), + StructField("Wc", IntegerType(), True), + StructField("FeelsLike", IntegerType(), True), + StructField("IconExtd", IntegerType(), True), + StructField("Wxman", StringType(), True), + StructField("IconCode", IntegerType(), True), + StructField("Dow", StringType(), True), + StructField("Phrase12Char", StringType(), True), + StructField("Phrase22Char", StringType(), True), + StructField("Phrase32Char", StringType(), True), + StructField("SubphrasePt1", StringType(), True), + StructField("SubphrasePt2", StringType(), True), + StructField("SubphrasePt3", StringType(), True), + StructField("Pop", StringType(), True), + StructField("PrecipType", StringType(), True), + StructField("Qpf", DoubleType(), True), + StructField("SnowQpf", DoubleType(), True), + StructField("Rh", IntegerType(), True), + StructField("Wspd", IntegerType(), True), + StructField("Wdir", IntegerType(), True), + StructField("WdirCardinal", StringType(), True), + StructField("Gust", DoubleType(), True), + StructField("Clds", IntegerType(), True), + StructField("Vis", DoubleType(), True), + StructField("Mslp", DoubleType(), True), + StructField("UvIndexRaw", DoubleType(), True), + StructField("UvIndex", IntegerType(), True), + StructField("UvWarning", IntegerType(), True), + StructField("UvDesc", StringType(), True), + StructField("GolfIndex", DoubleType(), True), + StructField("GolfCategory", StringType(), True), + StructField("Severity", IntegerType(), True), ] ) -WEATHER_FORECAST_MULTI_SCHEMA = StructType( +WEATHER_DATA_MODEL = StructType( [ - StructField("LATITUDE", DoubleType(), True), - StructField("LONGITUDE", DoubleType(), True), - *WEATHER_FORECAST_SCHEMA.fields + StructField("Latitude", DoubleType(), False), + StructField("Longitude", DoubleType(), False), + StructField('WeatherDay', StringType(), False), + StructField('WeatherHour', IntegerType(), False), + StructField('WeatherTimezoneOffset', StringType(), False), + StructField('WeatherType', StringType(), False), + StructField('ProcessedDate', TimestampType(), False), + StructField('Temperature', DoubleType(), True), + StructField('DewPoint', DoubleType(), True), + StructField('Humidity', DoubleType(), True), + StructField('HeatIndex', DoubleType(), True), + StructField('WindChill', DoubleType(), True), + StructField('WindDirection', DoubleType(), True), + StructField('WindSpeed', DoubleType(), True), + StructField('CloudCover', DoubleType(), True), + StructField('WetBulbTemp', StringType(), True), + StructField('SolarIrradiance', StringType(), True), + StructField('Precipitation', DoubleType(), True), + StructField('DayOrNight', StringType(), True), + StructField('DayOfWeek', StringType(), True), + StructField('WindGust', IntegerType(), True), + StructField('MslPressure', DoubleType(), True), + StructField('ForecastDayNum', IntegerType(), True), + StructField('PropOfPrecip', IntegerType(), True), + StructField('PrecipType', StringType(), True), + StructField('SnowAccumulation', DoubleType(), True), + StructField('UvIndex', DoubleType(), True), + StructField('Visibility', DoubleType(), True) ] ) diff --git a/src/sdk/python/rtdip_sdk/pipelines/destinations/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/destinations/__init__.py index bcda3cd94..fc9968c0e 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/destinations/__init__.py +++ b/src/sdk/python/rtdip_sdk/pipelines/destinations/__init__.py @@ -19,4 +19,5 @@ from .spark.kinesis import * from .spark.rest_api import * from .spark.pcdm_to_delta import * -from .blockchain.evm import * \ No newline at end of file +from .blockchain.evm import * +from .python.delta import * \ No newline at end of file diff --git a/src/sdk/python/rtdip_sdk/pipelines/destinations/python/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/destinations/python/__init__.py new file mode 100644 index 000000000..0b3d67993 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/destinations/python/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/src/sdk/python/rtdip_sdk/pipelines/destinations/python/delta.py b/src/sdk/python/rtdip_sdk/pipelines/destinations/python/delta.py new file mode 100644 index 000000000..365399472 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/destinations/python/delta.py @@ -0,0 +1,93 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +import pandas as pd +from deltalake import write_deltalake, DeltaTable +from typing import Literal +import pyarrow as pa +import polars as pl +from polars import LazyFrame +from typing import Callable +from ..interfaces import DestinationInterface +from ..._pipeline_utils.models import Libraries, SystemType +from ..._pipeline_utils.constants import get_default_package + +class PythonDeltaDestination(DestinationInterface): + ''' + The Python Delta Destination is used to write data to a Delta table from a Polars LazyFrame. + + Args: + data (LazyFrame): Polars LazyFrame to be written to Delta + path (str): Path to Delta table to be written to; either local or [remote](https://delta-io.github.io/delta-rs/python/usage.html#loading-a-delta-table){ target="_blank" }. **Locally** if the Table does't exist one will be created, but to write to AWS or Azure, you must have an existing Delta Table + options (Optional dict): Used if writing to a remote location. For AWS use format {"aws_access_key_id": "<>", "aws_secret_access_key": "<>"}. For Azure use format {"azure_storage_account_name": "storageaccountname", "azure_storage_access_key": "<>"} + mode (Literal['error', 'append', 'overwrite', 'ignore']): Defaults to error if table exists, 'ignore' won't write anything if table exists + overwrite_schema (bool): If True will allow for the table schema to be overwritten + delta_write_options (dict): Options when writing to a Delta table. See [here](https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables){ target="_blank" } for all options + ''' + data: LazyFrame + path: str + options: dict + mode: Literal['error', 'append', 'overwrite', 'ignore'] + overwrite_schema: bool + delta_write_options: bool + + def __init__(self, data: LazyFrame, path: str, options: dict = None, mode: Literal['error', 'append', 'overwrite', 'ignore'] = 'error', overwrite_schema: bool = False, delta_write_options: bool = False, query_name = None) -> None: + self.data = data + self.path = path + self.options = options + self.mode = mode + self.overwrite_schema = overwrite_schema + self.delta_write_options = delta_write_options + + @staticmethod + def system_type(): + ''' + Attributes: + SystemType (Environment): Requires PYTHON + ''' + return SystemType.PYTHON + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + def pre_write_validation(self): + return True + + def post_write_validation(self): + return True + + def write_batch(self): + ''' + Writes batch data to Delta without using Spark. + ''' + if isinstance(self.data, pl.LazyFrame): + df = self.data.collect() + df.write_delta(self.path, mode=self.mode, overwrite_schema= self.overwrite_schema, storage_options=self.options, delta_write_options=self.delta_write_options) + else: + raise ValueError("Data must be a Polars LazyFrame. See https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html") + + def write_stream(self): + ''' + Raises: + NotImplementedError: Writing to a Delta table using Python is only possible for batch writes. To perform a streaming read, use the write_stream method of the SparkDeltaDestination component. + ''' + raise NotImplementedError("Writing to a Delta table using Python is only possible for batch writes. To perform a streaming read, use the write_stream method of the SparkDeltaDestination component") \ No newline at end of file diff --git a/src/sdk/python/rtdip_sdk/pipelines/sources/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/sources/__init__.py index c825764f6..10faa820e 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/sources/__init__.py +++ b/src/sdk/python/rtdip_sdk/pipelines/sources/__init__.py @@ -22,3 +22,5 @@ from .spark.kinesis import * from .spark.iso import * from .spark.weather import * +from .python.delta import * +from .python.delta_sharing import * diff --git a/src/sdk/python/rtdip_sdk/pipelines/sources/python/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/sources/python/__init__.py new file mode 100644 index 000000000..0b3d67993 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/sources/python/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/src/sdk/python/rtdip_sdk/pipelines/sources/python/delta.py b/src/sdk/python/rtdip_sdk/pipelines/sources/python/delta.py new file mode 100644 index 000000000..03e822357 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/sources/python/delta.py @@ -0,0 +1,82 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ..interfaces import SourceInterface +from ..._pipeline_utils.models import Libraries, SystemType +from ..._pipeline_utils.constants import get_default_package +import polars as pl +from polars import LazyFrame + +class PythonDeltaSource(SourceInterface): + ''' + The Python Delta Source is used to read data from a Delta table without using Apache Spark, returning a Polars LazyFrame + + Args: + path (str): Path to the Delta table. Can be local or in S3/Azure storage + version (optional int): Specify the Delta table version to read from. Defaults to the latest version + storage_options (optional dict): Used to read from AWS/Azure storage. For AWS use format {"aws_access_key_id": "<>", "aws_secret_access_key":"<>"}. For Azure use format {"azure_storage_account_name": "<>", "azure_storage_account_key": "<>"}. + pyarrow_options (optional dict): Data Access and Efficiency options when reading from Delta. See [to_pyarrow_dataset](https://delta-io.github.io/delta-rs/python/api_reference.html#deltalake.table.DeltaTable.to_pyarrow_dataset){ target="_blank" }. + without_files (optional bool): If True loads the table without tracking files + ''' + path: str + version: int + storage_options: dict + pyarrow_options: dict + without_files: bool + + def __init__(self, path: str, version: int = None, storage_options: dict = None, pyarrow_options: dict = None, without_files: bool = False): + self.path = path + self.version = version + self.storage_options = storage_options + self.pyarrow_options = pyarrow_options + self.without_files = without_files + + @staticmethod + def system_type(): + ''' + Attributes: + SystemType (Environment): Requires PYTHON + ''' + return SystemType.PYTHON + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + def pre_read_validation(self): + return True + + def post_read_validation(self): + return True + + def read_batch(self) -> LazyFrame: + ''' + Reads data from a Delta table into a Polars LazyFrame + ''' + without_files_dict = {"without_files": self.without_files} + lf = pl.scan_delta(source= self.path, version= self.version, storage_options= self.storage_options, delta_table_options= without_files_dict, pyarrow_options= self.pyarrow_options) + return lf + + def read_stream(self): + ''' + Raises: + NotImplementedError: Reading from a Delta table using Python is only possible for batch reads. To perform a streaming read, use the read_stream method of the SparkDeltaSource component. + ''' + raise NotImplementedError("Reading from a Delta table using Python is only possible for batch reads. To perform a streaming read, use the read_stream method of the SparkDeltaSource component") + \ No newline at end of file diff --git a/src/sdk/python/rtdip_sdk/pipelines/sources/python/delta_sharing.py b/src/sdk/python/rtdip_sdk/pipelines/sources/python/delta_sharing.py new file mode 100644 index 000000000..e6da9dfe1 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/sources/python/delta_sharing.py @@ -0,0 +1,81 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ..interfaces import SourceInterface +from ..._pipeline_utils.models import Libraries, SystemType +from ..._pipeline_utils.constants import get_default_package +import delta_sharing +import polars as pl +from polars import LazyFrame + + +class PythonDeltaSharingSource(SourceInterface): + ''' + The Python Delta Sharing Source is used to read data from a Delta table with Delta Sharing configured, without using Apache Spark. + + Args: + profile_path (str): Location of the credential file. Can be any URL supported by [FSSPEC](https://filesystem-spec.readthedocs.io/en/latest/index.html){ target="_blank" } + share_name (str): The value of 'share=' for the table + schema_name (str): The value of 'schema=' for the table + table_name (str): The value of 'name=' for the table + + ''' + profile_path: str + share_name: str + schema_name: str + table_name: str + + def __init__(self, profile_path: str, share_name: str, schema_name: str, table_name: str): + self.profile_path = profile_path + self.share_name = share_name + self.schema_name = schema_name + self.table_name = table_name + + @staticmethod + def system_type(): + ''' + Attributes: + SystemType (Environment): Requires PYTHON + ''' + return SystemType.PYTHON + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + def pre_read_validation(self): + return True + + def post_read_validation(self): + return True + + def read_batch(self) -> LazyFrame: + ''' + Reads data from a Delta table with Delta Sharing into a Polars LazyFrame. + ''' + pandas_df = delta_sharing.load_as_pandas(f"{self.profile_path}#{self.share_name}.{self.schema_name}.{self.table_name}") + polars_lazyframe = pl.from_pandas(pandas_df).lazy() + return polars_lazyframe + + def read_stream(self): + ''' + Raises: + NotImplementedError: Reading from a Delta table with Delta Sharing using Python is only possible for batch reads. + ''' + raise NotImplementedError("Reading from a Delta table with Delta Sharing using Python is only possible for batch reads.") diff --git a/src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py b/src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py index b3fda043a..e61d67491 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py +++ b/src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py @@ -16,13 +16,16 @@ import logging from py4j.protocol import Py4JJavaError from pyspark.sql import DataFrame, SparkSession -from pyspark.sql.functions import col, map_from_entries +from pyspark.sql.functions import col, map_from_entries, udf +from pyspark.sql.types import MapType, StringType from urllib.parse import urlparse from ..interfaces import SourceInterface from ..._pipeline_utils.models import Libraries, SystemType from ..._pipeline_utils.spark import KAFKA_EVENTHUB_SCHEMA from ..._pipeline_utils.constants import get_default_package +from ..._pipeline_utils.amqp import decode_kafka_headers_to_amqp_properties + class SparkKafkaEventhubSource(SourceInterface): ''' @@ -209,8 +212,7 @@ def _configure_options(self, options: dict) -> dict: if "kafka.group.id" not in options: options["kafka.group.id"] = self.consumer_group - if "includeHeaders" not in options: - options["includeHeaders"] = "true" + options["includeHeaders"] = "true" return options @@ -222,7 +224,7 @@ def _transform_to_eventhub_schema(self, df: DataFrame) -> DataFrame: col("partition").cast("string"), col("offset").alias("sequenceNumber"), col("timestamp").alias("enqueuedTime"), - col("headers").alias("properties").cast("map") + decode_kafka_headers_to_amqp_properties(col("headers")).alias("properties") ) ) diff --git a/src/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/weather_forecast_api_v1.py b/src/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/weather_forecast_api_v1.py index 936a8c617..093a26c6f 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/weather_forecast_api_v1.py +++ b/src/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/weather_forecast_api_v1.py @@ -70,7 +70,52 @@ def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: """ - df.columns = list(map(lambda x: x.upper(), df.columns)) + rename_cols = { + "latitude": "Latitude", + "longitude": "Longitude", + "class": "Class", + "expire_time_gmt": "ExpireTimeGmt", + "fcst_valid": "FcstValid", + "fcst_valid_local": "FcstValidLocal", + "num": "Num", + "day_ind": "DayInd", + "temp": "Temp", + "dewpt": "Dewpt", + "hi": "Hi", + "wc": "Wc", + "feels_like": "FeelsLike", + "icon_extd": "IconExtd", + "wxman": "Wxman", + "icon_code": "IconCode", + "dow": "Dow", + "phrase_12char": "Phrase12Char", + "phrase_22char": "Phrase22Char", + "phrase_32char": "Phrase32Char", + "subphrase_pt1": "SubphrasePt1", + "subphrase_pt2": "SubphrasePt2", + "subphrase_pt3": "SubphrasePt3", + "pop": "Pop", + "precip_type": "PrecipType", + "qpf": "Qpf", + "snow_qpf": "SnowQpf", + "rh": "Rh", + "wspd": "Wspd", + "wdir": "Wdir", + "wdir_cardinal": "WdirCardinal", + "gust": "Gust", + "clds": "Clds", + "vis": "Vis", + "mslp": "Mslp", + "uv_index_raw": "UvIndexRaw", + "uv_index": "UvIndex", + "uv_warning": "UvWarning", + "uv_desc": "UvDesc", + "golf_index": "GolfIndex", + "golf_category": "GolfCategory", + "severity": "Severity", + } + + df = df.rename(columns=rename_cols) fields = self.spark_schema.fields @@ -82,6 +127,8 @@ def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame: df[double_cols] = df[double_cols].astype(float) df[int_cols] = df[int_cols].astype(int) + df.reset_index(inplace=True, drop=True) + return df def _get_api_params(self): @@ -92,7 +139,6 @@ def _get_api_params(self): } return params - print() def _pull_for_weather_station(self, lat: str, lon: str) -> pd.DataFrame: response = json.loads(self._fetch_from_url(f"{lat}/{lon}/forecast/hourly/360hour.json").decode("utf-8")) return pd.DataFrame(response["forecasts"]) @@ -105,4 +151,8 @@ def _pull_data(self) -> pd.DataFrame: Raw form of data. """ - return self._pull_for_weather_station(self.lat, self.lon) + df = self._pull_for_weather_station(self.lat, self.lon) + df["latitude"] = self.lat + df["longitude"] = self.lon + + return df diff --git a/src/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/weather_forecast_api_v1_multi.py b/src/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/weather_forecast_api_v1_multi.py index b79db7f06..745bcdd9f 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/weather_forecast_api_v1_multi.py +++ b/src/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/weather_forecast_api_v1_multi.py @@ -12,11 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import pandas as pd from pyspark.sql import SparkSession -from ...._pipeline_utils.weather import WEATHER_FORECAST_MULTI_SCHEMA +from ...._pipeline_utils.weather import WEATHER_FORECAST_SCHEMA from .weather_forecast_api_v1 import WeatherForecastAPIV1Source @@ -48,7 +47,7 @@ class WeatherForecastAPIV1MultiSource(WeatherForecastAPIV1Source): spark: SparkSession options: dict - spark_schema = WEATHER_FORECAST_MULTI_SCHEMA + spark_schema = WEATHER_FORECAST_SCHEMA required_options = ["stations", "api_key"] def __init__(self, spark: SparkSession, options: dict) -> None: @@ -69,7 +68,6 @@ def _pull_data(self) -> pd.DataFrame: """ result_df = None - for station in self.stations: parts = station.split(",") lat, lon = parts @@ -78,7 +76,10 @@ def _pull_data(self) -> pd.DataFrame: df["latitude"] = lat df["longitude"] = lon - result_df = pd.concat([result_df, df]) if result_df is not None else df + if result_df is not None: + result_df = pd.concat([result_df, df]) + else: + result_df = df return result_df diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/raw_forecast_to_weather_data_model.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/raw_forecast_to_weather_data_model.py new file mode 100644 index 000000000..8cef1a82f --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/raw_forecast_to_weather_data_model.py @@ -0,0 +1,130 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import datetime + +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql.functions import when, substring, lit, col, concat +from pyspark.sql.types import IntegerType + +from ..interfaces import TransformerInterface +from ..._pipeline_utils.models import Libraries, SystemType +from ..._pipeline_utils.weather import WEATHER_DATA_MODEL + + +class RawForecastToWeatherDataModel(TransformerInterface): + ''' + Converts a raw forecast into weather data model + + Args: + spark (SparkSession): Spark Session instance. + data (DataFrame): Dataframe to be transformed + ''' + spark: SparkSession + data: DataFrame + + def __init__(self, spark: SparkSession, data: DataFrame, ) -> None: + self.spark = spark + self.data = data + self.target_schema = WEATHER_DATA_MODEL + + @staticmethod + def system_type(): + ''' + Attributes: + SystemType (Environment): Requires PYSPARK + ''' + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + def pre_transform_validation(self): + return True + + def post_transform_validation(self) -> bool: + assert str(self.data.schema) == str(self.target_schema) + return True + + def _convert_into_target_schema(self) -> None: + """ + Converts a Spark DataFrame structure into new structure based on the Target Schema. + + Returns: Nothing. + + """ + + df: DataFrame = self.data + df = df.select(self.target_schema.names) + + + for field in self.target_schema.fields: + df = df.withColumn(field.name, col(field.name).cast(field.dataType)) + + self.data = self.spark.createDataFrame(df.rdd, self.target_schema) + + def transform(self) -> DataFrame: + ''' + Returns: + DataFrame: A Forecast dataframe converted into Weather Data Model + ''' + + self.pre_transform_validation() + + processed_date = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + + df = ( + self.data + .withColumn("WeatherDay", substring("FcstValidLocal", 0, 10)) + .withColumn("WeatherHour", (substring("FcstValidLocal", 12, 2).cast(IntegerType()) + 1)) + .withColumn("WeatherTimezoneOffset", substring("FcstValidLocal", 20, 5)) + .withColumn("WeatherType", lit("F")) + .withColumn("ProcessedDate", lit(processed_date)) + .withColumnRenamed("Temp", "Temperature") + .withColumnRenamed("Dewpt", "DewPoint") + .withColumnRenamed("Rh", "Humidity") + .withColumnRenamed("Hi", "HeatIndex") + .withColumnRenamed("Wc", "WindChill") + .withColumnRenamed("Wdir", "WindDirection") + .withColumnRenamed("Wspd", "WindSpeed") + .withColumnRenamed("Clds", "CloudCover") + .withColumn("WetBulbTemp", lit("")) + .withColumn("SolarIrradiance", lit("")) + .withColumnRenamed("Qpf", "Precipitation") + .withColumnRenamed("DayInd", "DayOrNight") + .withColumnRenamed("Dow", "DayOfWeek") + .withColumnRenamed("Gust", "WindGust") + .withColumnRenamed("Mslp", "MslPressure") + .withColumnRenamed("Num", "ForecastDayNum") + .withColumnRenamed("Pop", "PropOfPrecip") + .withColumnRenamed("PrecipType", "PrecipType") + .withColumnRenamed("SnowQpf", "SnowAccumulation") + .withColumnRenamed("UvIndex", "UvIndex") + .withColumnRenamed("Vis", "Visibility") + ) + + columns = df.columns + for column in columns: + df = df.withColumn(column, when(col(column) == "", lit(None)).otherwise(col(column))) + + self.data = df + self._convert_into_target_schema() + self.post_transform_validation() + + return self.data diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py b/src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py index 085e94bed..f505b827f 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py @@ -226,16 +226,27 @@ def _time_weighted_average_query(parameters_dict: dict) -> str: time_weighted_average_query = ( "WITH raw_events AS (SELECT DISTINCT EventDate, TagName, from_utc_timestamp(to_timestamp(date_format(EventTime, 'yyyy-MM-dd HH:mm:ss.SSS')), \"{{ time_zone }}\") as EventTime, Status, Value FROM `{{ business_unit }}`.`sensors`.`{{ asset }}_{{ data_security_level }}_events_{{ data_type }}` WHERE EventDate BETWEEN date_sub(to_date(to_timestamp(\"{{ start_date }}\")), {{ window_length }}) AND date_add(to_date(to_timestamp(\"{{ end_date }}\")), {{ window_length }}) AND TagName in ('{{ tag_names | join('\\', \\'') }}') " "{% if include_bad_data is defined and include_bad_data == false %} AND Status = 'Good' {% endif %}) " + "{% if step is defined and step == \"metadata\" %} " + ",meta_data AS (SELECT TagName, IFNULL(Step, false) AS Step FROM `downstream`.`sensors`.`pernis_restricted_metadata` )" + "{% endif %}" ",date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp(\"{{ start_date }}\"), \"{{ time_zone }}\"), from_utc_timestamp(to_timestamp(\"{{ end_date }}\"), \"{{ time_zone }}\"), INTERVAL '{{ time_interval_rate + ' ' + time_interval_unit }}')) AS EventTime, explode(array('{{ tag_names | join('\\', \\'') }}')) AS TagName) " ",window_events AS (SELECT coalesce(a.TagName, b.TagName) AS TagName, coalesce(a.EventTime, b.EventTime) as EventTime, window(coalesce(a.EventTime, b.EventTime), '{{ time_interval_rate + ' ' + time_interval_unit }}').start WindowEventTime, b.Status, b.Value FROM date_array a " "FULL OUTER JOIN raw_events b ON CAST(a.EventTime AS long) = CAST(b.EventTime AS long) AND a.TagName = b.TagName) " ",fill_status AS (SELECT *, last_value(Status, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as Fill_Status, CASE WHEN Fill_Status = \"Good\" THEN Value ELSE null END AS Good_Value FROM window_events) " ",fill_value AS (SELECT *, last_value(Good_Value, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Fill_Value FROM fill_status) " + "{% if step is defined and step == \"metadata\" %} " + ",twa_calculations AS (SELECT f.TagName, f.EventTime, f.WindowEventTime, m.Step, f.Status, f.Value, f.Fill_Status, f.Fill_Value, lead(f.EventTime) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) AS Next_EventTime, lead(f.Fill_Status) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) AS Next_Status " + ",CASE WHEN Next_Status = \"Good\" OR (f.Fill_Status = \"Good\" AND Next_Status = \"Bad\") THEN lead(f.Fill_Value) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) ELSE f.Value END AS Next_Value_For_Status " + ",CASE WHEN f.Fill_Status = \"Good\" THEN Next_Value_For_Status ELSE 0 END AS Next_Value " + ",CASE WHEN f.Fill_Status = \"Good\" and Next_Status = \"Good\" THEN ((cast(Next_EventTime as double) - cast(f.EventTime as double)) / 60) WHEN f.Fill_Status = \"Good\" and Next_Status != \"Good\" THEN ((cast(Next_EventTime as integer) - cast(f.EventTime as double)) / 60) ELSE 0 END AS good_minutes " + ",CASE WHEN m.Step == false THEN ((f.Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (f.Fill_Value * good_minutes) END AS twa_value FROM fill_value f LEFT JOIN meta_data m ON f.TagName = m.TagName) " + "{% else %} " ",twa_calculations AS (SELECT TagName, EventTime, WindowEventTime, {{ step }} AS Step, Status, Value, Fill_Status, Fill_Value, lead(EventTime) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_EventTime, lead(Fill_Status) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_Status " ",CASE WHEN Next_Status = \"Good\" OR (Fill_Status = \"Good\" AND Next_Status = \"Bad\") THEN lead(Fill_Value) OVER (PARTITION BY TagName ORDER BY EventTime) ELSE Value END AS Next_Value_For_Status " ",CASE WHEN Fill_Status = \"Good\" THEN Next_Value_For_Status ELSE 0 END AS Next_Value " ",CASE WHEN Fill_Status = \"Good\" and Next_Status = \"Good\" THEN ((cast(Next_EventTime as double) - cast(EventTime as double)) / 60) WHEN Fill_Status = \"Good\" and Next_Status != \"Good\" THEN ((cast(Next_EventTime as integer) - cast(EventTime as double)) / 60) ELSE 0 END AS good_minutes " ",CASE WHEN Step == false THEN ((Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (Fill_Value * good_minutes) END AS twa_value FROM fill_value) " + "{% endif %} " ",project_result AS (SELECT TagName, WindowEventTime AS EventTime, sum(twa_value) / sum(good_minutes) AS Value from twa_calculations GROUP BY TagName, WindowEventTime) " "SELECT * FROM project_result WHERE EventTime BETWEEN to_timestamp(\"{{ start_datetime }}\") AND to_timestamp(\"{{ end_datetime }}\") ORDER BY TagName, EventTime " ) diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/time_weighted_average.py b/src/sdk/python/rtdip_sdk/queries/time_series/time_weighted_average.py index 9a17b0d3e..169fcdf89 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/time_weighted_average.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/time_weighted_average.py @@ -40,7 +40,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame: time_interval_unit (str): The time interval unit (second, minute, day, hour) window_length (int): Add longer window time in days for the start or end of specified date to cater for edge cases. include_bad_data (bool): Include "Bad" data points with True or remove "Bad" data points with False - step (str): data points with step "enabled" or "disabled". The options for step are "metadata", "true" or "false". "metadata" will get the step requirements from the metadata table if applicable. + step (str): data points with step "enabled" or "disabled". The options for step are "true", "false" or "metadata". "metadata" will retrieve the step value from the metadata table. Returns: DataFrame: A dataframe containing the time weighted averages. ''' diff --git a/tests/sdk/python/rtdip_sdk/pipelines/destinations/python/__init__.py b/tests/sdk/python/rtdip_sdk/pipelines/destinations/python/__init__.py new file mode 100644 index 000000000..0b3d67993 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/destinations/python/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/tests/sdk/python/rtdip_sdk/pipelines/destinations/python/test_delta.py b/tests/sdk/python/rtdip_sdk/pipelines/destinations/python/test_delta.py new file mode 100644 index 000000000..600679586 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/destinations/python/test_delta.py @@ -0,0 +1,90 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +sys.path.insert(0, '.') +import pytest +from src.sdk.python.rtdip_sdk.pipelines.destinations.python.delta import PythonDeltaDestination +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import Libraries +from pytest_mock import MockerFixture +import polars as pl +import pandas as pd + +OPTIONS = {"aws_access_key_id": "id", "aws_secret_access_key": "key"} +polars_write_mocked = "polars.DataFrame.write_delta" + +def test_python_delta_write_setup(): + data = pl.LazyFrame({"col1": [1, 2], "col2": [3, 4]}) + delta_destination = PythonDeltaDestination(data, "path", {}, "overwrite") + assert delta_destination.system_type().value == 1 + assert delta_destination.libraries() == Libraries(maven_libraries=[], pypi_libraries=[], pythonwheel_libraries=[]) + assert isinstance(delta_destination.settings(), dict) + assert delta_destination.pre_write_validation() + assert delta_destination.post_write_validation() + +def test_python_delta_write_batch(mocker: MockerFixture): + mocked_write = mocker.patch(polars_write_mocked, return_value = None) + + data = pl.LazyFrame({"col1": [1, 2], "col2": [3, 4]}) + + delta_destination = PythonDeltaDestination(data=data, path="path", mode="overwrite") + actual = delta_destination.write_batch() + + mocked_write.assert_called_once + assert actual is None + +def test_python_delta_write_batch_with_options(mocker: MockerFixture): + mocked_write = mocker.patch(polars_write_mocked, return_value = None) + + data = pl.LazyFrame({"col1": [1, 2], "col2": [3, 4]}) + + delta_destination = PythonDeltaDestination(data=data, path="path", options=OPTIONS, mode="overwrite") + actual = delta_destination.write_batch() + + mocked_write.assert_called_once + assert actual is None + +def test_python_delta_write_batch_fails(mocker: MockerFixture): + mocker.patch(polars_write_mocked, side_effect = Exception) + + data = pl.LazyFrame({"col1": [1, 2], "col2": [3, 4]}) + delta_destination = PythonDeltaDestination(data=data, path="path", mode="overwrite") + + with pytest.raises(Exception): + delta_destination.write_batch() + +def test_python_delta_write_batch_with_options_fails(mocker: MockerFixture): + mocker.patch(polars_write_mocked, side_effect = Exception) + + data = pl.LazyFrame({"col1": [1, 2], "col2": [3, 4]}) + delta_destination = PythonDeltaDestination(data=data, path="path", options=OPTIONS, mode="overwrite") + + with pytest.raises(Exception): + delta_destination.write_batch() + +def test_python_delta_write_batch_type_fails(): + with pytest.raises(ValueError) as excinfo: + data = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]}) + delta_destination = PythonDeltaDestination(data=data, path="path", options=OPTIONS, mode="overwrite") + delta_destination.write_batch() + + assert str(excinfo.value) == 'Data must be a Polars LazyFrame. See https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html' + +def test_python_delta_write_stream(): + with pytest.raises(NotImplementedError) as excinfo: + data = pl.LazyFrame({"col1": [1, 2], "col2": [3, 4]}) + delta_destination = PythonDeltaDestination(data=data, path="path", options=OPTIONS, mode="overwrite") + delta_destination.write_stream() + + assert str(excinfo.value) == 'Writing to a Delta table using Python is only possible for batch writes. To perform a streaming read, use the write_stream method of the SparkDeltaDestination component' \ No newline at end of file diff --git a/tests/sdk/python/rtdip_sdk/pipelines/sources/python/__init__.py b/tests/sdk/python/rtdip_sdk/pipelines/sources/python/__init__.py new file mode 100644 index 000000000..0b3d67993 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/sources/python/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/tests/sdk/python/rtdip_sdk/pipelines/sources/python/test_delta.py b/tests/sdk/python/rtdip_sdk/pipelines/sources/python/test_delta.py new file mode 100644 index 000000000..1e1ad100f --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/sources/python/test_delta.py @@ -0,0 +1,43 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +sys.path.insert(0, '.') +from src.sdk.python.rtdip_sdk.pipelines.sources.python.delta import PythonDeltaSource +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import Libraries +from pytest_mock import MockerFixture +import pytest +import polars as pl +table_path = '/path/to/table' +def test_python_delta_setup(): + delta_source = PythonDeltaSource(table_path) + assert delta_source.system_type().value == 1 + assert delta_source.libraries() == Libraries(maven_libraries=[], pypi_libraries=[], pythonwheel_libraries=[]) + assert isinstance(delta_source.settings(), dict) + assert delta_source.pre_read_validation() + assert delta_source.post_read_validation() + +def test_python_delta_read_batch(mocker: MockerFixture): + delta_source = PythonDeltaSource(table_path) + mocker.patch.object(pl, 'scan_delta', return_value = pl.LazyFrame()) + lf = delta_source.read_batch() + assert isinstance(lf, pl.LazyFrame) + +def test_delta_read_batch_fails(): + delta_source = PythonDeltaSource(table_path) + with pytest.raises(FileNotFoundError): + delta_source.read_batch() + +def test_python_delta_read_stream(): + delta_source = PythonDeltaSource(table_path) + with pytest.raises(NotImplementedError): + delta_source.read_stream() diff --git a/tests/sdk/python/rtdip_sdk/pipelines/sources/python/test_delta_sharing.py b/tests/sdk/python/rtdip_sdk/pipelines/sources/python/test_delta_sharing.py new file mode 100644 index 000000000..2193a3a14 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/sources/python/test_delta_sharing.py @@ -0,0 +1,50 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +sys.path.insert(0, '.') +from src.sdk.python.rtdip_sdk.pipelines.sources.python.delta_sharing import PythonDeltaSharingSource +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import Libraries +from pytest_mock import MockerFixture +import pytest +import delta_sharing +import pandas as pd +import polars as pl + +profile_path = '/path/to/table' +share_name = 'share_name' +schema_name = 'schema_name' +table_name = 'table_name' + +def test_python_delta_sharing_setup(): + delta_sharing_source = PythonDeltaSharingSource(profile_path, share_name, schema_name, table_name) + assert delta_sharing_source.system_type().value == 1 + assert delta_sharing_source.libraries() == Libraries(maven_libraries=[], pypi_libraries=[], pythonwheel_libraries=[]) + assert isinstance(delta_sharing_source.settings(), dict) + assert delta_sharing_source.pre_read_validation() + assert delta_sharing_source.post_read_validation() + +def test_python_delta_sharing_read_batch(mocker: MockerFixture): + delta_sharing_source = PythonDeltaSharingSource(profile_path, share_name, schema_name, table_name) + mocker.patch.object(delta_sharing, 'load_as_pandas', return_value = pd.DataFrame({'test': ['test_data']})) + lf = delta_sharing_source.read_batch() + assert isinstance(lf, pl.LazyFrame) + +def test_delta_sharing_read_batch_fails(): + delta_sharing_source = PythonDeltaSharingSource(profile_path, share_name, schema_name, table_name) + with pytest.raises(FileNotFoundError): + delta_sharing_source.read_batch() + +def test_python_delta_sharing_read_stream(): + delta_sharing_source = PythonDeltaSharingSource(profile_path, share_name, schema_name, table_name) + with pytest.raises(NotImplementedError): + delta_sharing_source.read_stream() \ No newline at end of file diff --git a/tests/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/test_base_weather.py b/tests/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/test_base_weather.py index 59a54068f..90751642d 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/test_base_weather.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/test_base_weather.py @@ -20,13 +20,13 @@ from pyspark.sql import DataFrame, SparkSession from pytest_mock import MockerFixture -iso_configuration = { +configuration = { } def test_base_weather_read_setup(spark_session: SparkSession): - base_weather_source = BaseWeatherSource(spark_session, iso_configuration) + base_weather_source = BaseWeatherSource(spark_session, configuration) assert base_weather_source.system_type().value == 2 assert base_weather_source.libraries() == Libraries(maven_libraries=[], pypi_libraries=[], pythonwheel_libraries=[]) @@ -39,7 +39,7 @@ def test_base_weather_read_setup(spark_session: SparkSession): def test_weather_iso_read_stream_exception(spark_session: SparkSession): with pytest.raises(NotImplementedError) as exc_info: - base_weather_source = BaseWeatherSource(spark_session, iso_configuration) + base_weather_source = BaseWeatherSource(spark_session, configuration) base_weather_source.read_stream() assert str(exc_info.value) == "BaseWeatherSource connector doesn't support stream operation." @@ -47,7 +47,7 @@ def test_weather_iso_read_stream_exception(spark_session: SparkSession): def test_weather_iso_required_options_fails(spark_session: SparkSession): with pytest.raises(ValueError) as exc_info: - base_weather_source = BaseWeatherSource(spark_session, iso_configuration) + base_weather_source = BaseWeatherSource(spark_session, configuration) base_weather_source.required_options = ["lat"] base_weather_source.pre_read_validation() @@ -55,7 +55,7 @@ def test_weather_iso_required_options_fails(spark_session: SparkSession): def test_weather_iso_fetch_url_fails(spark_session: SparkSession, mocker: MockerFixture): - base_weather_source = BaseWeatherSource(spark_session, iso_configuration) + base_weather_source = BaseWeatherSource(spark_session, configuration) sample_bytes = bytes("Unknown Error".encode("utf-8")) class MyResponse: diff --git a/tests/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/test_weather_forecast_api_v1.py b/tests/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/test_weather_forecast_api_v1.py old mode 100644 new mode 100755 index ab2c43779..f6bebac80 --- a/tests/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/test_weather_forecast_api_v1.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/test_weather_forecast_api_v1.py @@ -128,26 +128,28 @@ ] } -expected_json = {"CLASS": {"0": "fod_short_range_hourly", "1": "fod_short_range_hourly"}, "CLDS": {"0": 79, "1": 69}, - "DAY_IND": {"0": "D", "1": "D"}, "DEWPT": {"0": 77, "1": 77}, "DOW": {"0": "Friday", "1": "Friday"}, - "EXPIRE_TIME_GMT": {"0": 1686945840, "1": 1686945840}, - "FCST_VALID": {"0": 1686945600, "1": 1686949200}, - "FCST_VALID_LOCAL": {"0": "2023-06-16T15:00:00-0500", "1": "2023-06-16T16:00:00-0500"}, - "FEELS_LIKE": {"0": 105, "1": 105}, "GOLF_CATEGORY": {"0": "Very Good", "1": "Very Good"}, - "GOLF_INDEX": {"0": 8.0, "1": 8.0}, "GUST": {"0": None, "1": None}, "HI": {"0": 105, "1": 105}, - "ICON_CODE": {"0": 28, "1": 28}, "ICON_EXTD": {"0": 2800, "1": 2800}, "MSLP": {"0": 29.77, "1": 29.76}, - "NUM": {"0": 1, "1": 2}, "PHRASE_12CHAR": {"0": "M Cloudy", "1": "M Cloudy"}, - "PHRASE_22CHAR": {"0": "Mostly Cloudy", "1": "Mostly Cloudy"}, - "PHRASE_32CHAR": {"0": "Mostly Cloudy", "1": "Mostly Cloudy"}, "POP": {"0": "15", "1": "15"}, - "PRECIP_TYPE": {"0": "rain", "1": "rain"}, "QPF": {"0": 0.0, "1": 0.0}, "RH": {"0": 64, "1": 63}, - "SEVERITY": {"0": 1, "1": 1}, "SNOW_QPF": {"0": 0.0, "1": 0.0}, - "SUBPHRASE_PT1": {"0": "Mostly", "1": "Mostly"}, "SUBPHRASE_PT2": {"0": "Cloudy", "1": "Cloudy"}, - "SUBPHRASE_PT3": {"0": "", "1": ""}, "TEMP": {"0": 91, "1": 91}, - "UV_DESC": {"0": "High", "1": "Moderate"}, - "UV_INDEX": {"0": 6, "1": 5}, "UV_INDEX_RAW": {"0": 5.65, "1": 5.08}, "UV_WARNING": {"0": 0, "1": 0}, - "VIS": {"0": 10.0, "1": 10.0}, "WC": {"0": 91, "1": 91}, "WDIR": {"0": 233, "1": 235}, - "WDIR_CARDINAL": {"0": "SW", "1": "SW"}, "WSPD": {"0": 6, "1": 5}, - "WXMAN": {"0": "wx1230", "1": "wx1230"}} + +expected_json = {"Latitude": {"0": 32.3667, "1": 32.3667}, "Longitude": {"0": -95.4, "1": -95.4}, + "Class": {"0": "fod_short_range_hourly", "1": "fod_short_range_hourly"}, + "ExpireTimeGmt": {"0": 1686945840, "1": 1686945840}, + "FcstValid": {"0": 1686945600, "1": 1686949200}, + "FcstValidLocal": {"0": "2023-06-16T15:00:00-0500", "1": "2023-06-16T16:00:00-0500"}, + "Num": {"0": 1, "1": 2}, "DayInd": {"0": "D", "1": "D"}, "Temp": {"0": 91, "1": 91}, + "Dewpt": {"0": 77, "1": 77}, "Hi": {"0": 105, "1": 105}, "Wc": {"0": 91, "1": 91}, + "FeelsLike": {"0": 105, "1": 105}, "IconExtd": {"0": 2800, "1": 2800}, + "Wxman": {"0": "wx1230", "1": "wx1230"}, "IconCode": {"0": 28, "1": 28}, + "Dow": {"0": "Friday", "1": "Friday"}, "Phrase12Char": {"0": "M Cloudy", "1": "M Cloudy"}, + "Phrase22Char": {"0": "Mostly Cloudy", "1": "Mostly Cloudy"}, + "Phrase32Char": {"0": "Mostly Cloudy", "1": "Mostly Cloudy"}, + "SubphrasePt1": {"0": "Mostly", "1": "Mostly"}, "SubphrasePt2": {"0": "Cloudy", "1": "Cloudy"}, + "SubphrasePt3": {"0": "", "1": ""}, "Pop": {"0": "15", "1": "15"}, + "PrecipType": {"0": "rain", "1": "rain"}, "Qpf": {"0": 0.0, "1": 0.0}, + "SnowQpf": {"0": 0.0, "1": 0.0}, "Rh": {"0": 64, "1": 63}, "Wspd": {"0": 6, "1": 5}, + "Wdir": {"0": 233, "1": 235}, "WdirCardinal": {"0": "SW", "1": "SW"}, "Gust": {"0": None, "1": None}, + "Clds": {"0": 79, "1": 69}, "Vis": {"0": 10.0, "1": 10.0}, "Mslp": {"0": 29.77, "1": 29.76}, + "UvIndexRaw": {"0": 5.65, "1": 5.08}, "UvIndex": {"0": 6, "1": 5}, "UvWarning": {"0": 0, "1": 0}, + "UvDesc": {"0": "High", "1": "Moderate"}, "GolfIndex": {"0": 8.0, "1": 8.0}, + "GolfCategory": {"0": "Very Good", "1": "Very Good"}, "Severity": {"0": 1, "1": 1}} def get_api_response() -> str: @@ -199,4 +201,4 @@ def get_response(url: str, params: dict): pdf = df.toPandas() expected_df = pd.DataFrame(expected_json) - assert str(pdf.to_json()) == str(expected_df.to_json()) + assert str(pdf.to_json()) == str(expected_df.to_json()) \ No newline at end of file diff --git a/tests/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/test_weather_forecast_api_v1_multi.py b/tests/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/test_weather_forecast_api_v1_multi.py old mode 100644 new mode 100755 index 145431257..4ca3f2f12 --- a/tests/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/test_weather_forecast_api_v1_multi.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/sources/spark/weather/test_weather_forecast_api_v1_multi.py @@ -17,14 +17,11 @@ import pytest from src.sdk.python.rtdip_sdk.pipelines.sources.spark.weather import WeatherForecastAPIV1MultiSource -from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.weather import WEATHER_FORECAST_MULTI_SCHEMA +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.weather import WEATHER_FORECAST_SCHEMA from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import Libraries -from test_weather_forecast_api_v1 import raw_api_response from pyspark.sql import DataFrame, SparkSession from pytest_mock import MockerFixture -MOSTLY_CLOUDY = "Mostly Cloudy" - configuration = { "api_key": "AA", "language": "en-US", @@ -35,6 +32,105 @@ ] } +raw_api_response1 = { + "metadata": { + "language": "en-US", + "transaction_id": "1686945524067:722833eedd2545334406be2bd368acdf", + "version": "1", + "latitude": 32.36, + "longitude": -95.4, + "units": "e", + "expire_time_gmt": 1686945840, + "status_code": 200 + }, + "forecasts": [ + { + "class": "fod_short_range_hourly_1", + "expire_time_gmt": 1686945844, + "fcst_valid": 1686945604, + "fcst_valid_local": "2023-06-16T18:00:00-0500", + "num": 5, + "day_ind": "E", + "temp": 81, + "dewpt": 67, + "hi": 107, + "wc": 95, + "feels_like": 108, + "icon_extd": 2801, + "wxman": "wx1233", + "icon_code": 24, + "dow": "Saturday", + "phrase_12char": "Cloudy", + "phrase_22char": "Cloudy", + "phrase_32char": "Cloudy", + "subphrase_pt1": "Mostly Cloudy", + "subphrase_pt2": "MostlyCloudy", + "subphrase_pt3": "1", + "pop": 16, + "precip_type": "storm", + "qpf": 0.1, + "snow_qpf": 0.2, + "rh": 65, + "wspd": 8, + "wdir": 235, + "wdir_cardinal": "SE", + "gust": 6, + "clds": 80, + "vis": 10.1, + "mslp": 29.78, + "uv_index_raw": 5.67, + "uv_index": 8, + "uv_warning": 1, + "uv_desc": "Low", + "golf_index": 9, + "golf_category": "Ok", + "severity": 2 + }, + { + "class": "fod_short_range_hourly_1", + "expire_time_gmt": 1686945843, + "fcst_valid": 168694920, + "fcst_valid_local": "2023-06-16T17:00:00-0500", + "num": 3, + "day_ind": "C", + "temp": 92, + "dewpt": 72, + "hi": 107, + "wc": 92, + "feels_like": 106, + "icon_extd": 2801, + "wxman": "wx1231", + "icon_code": 27, + "dow": "Saturday", + "phrase_12char": "B Cloudy", + "phrase_22char": "Cloudy", + "phrase_32char": "Cloudy", + "subphrase_pt1": "Mostly Windy", + "subphrase_pt2": "Mostly_Cloudy", + "subphrase_pt3": "Cloudy", + "pop": 16, + "precip_type": "wind", + "qpf": 0.1, + "snow_qpf": 0.1, + "rh": 64, + "wspd": 6, + "wdir": 236, + "wdir_cardinal": "SE", + "gust": 236, + "clds": 67, + "vis": 10.1, + "mslp": 29.77, + "uv_index_raw": 5.09, + "uv_index": 6, + "uv_warning": 1, + "uv_desc": "Medium", + "golf_index": 9, + "golf_category": "Good", + "severity": 2 + } + ] +} + raw_api_response2 = { "metadata": { "language": "en-US", @@ -134,265 +230,48 @@ ] } -expected_json = { - "LATITUDE": { - "0": 32.3667, - "1": 32.3667, - "2": 51.52, - "3": 51.52 - }, - "LONGITUDE": { - "0": -95.4, - "1": -95.4, - "2": -0.11, - "3": -0.11 - }, - "CLASS": { - "0": "fod_short_range_hourly", - "1": "fod_short_range_hourly", - "2": "fod_short_range_hourly", - "3": "fod_short_range_hourly" - }, - "CLDS": { - "0": 79, - "1": 69, - "2": 80, - "3": 77 - }, - "DAY_IND": { - "0": "D", - "1": "D", - "2": "D", - "3": "D" - }, - "DEWPT": { - "0": 77, - "1": 77, - "2": 62, - "3": 61 - }, - "DOW": { - "0": "Friday", - "1": "Friday", - "2": "Sunday", - "3": "Sunday" - }, - "EXPIRE_TIME_GMT": { - "0": 1686945840, - "1": 1686945840, - "2": 1687110027, - "3": 1687110027 - }, - "FCST_VALID": { - "0": 1686945600, - "1": 1686949200, - "2": 1687111200, - "3": 1687114800 - }, - "FCST_VALID_LOCAL": { - "0": "2023-06-16T15:00:00-0500", - "1": "2023-06-16T16:00:00-0500", - "2": "2023-06-18T19:00:00+0100", - "3": "2023-06-18T20:00:00+0100" - }, - "FEELS_LIKE": { - "0": 105, - "1": 105, - "2": 66, - "3": 66 - }, - "GOLF_CATEGORY": { - "0": "Very Good", - "1": "Very Good", - "2": "Good", - "3": "Good" - }, - "GOLF_INDEX": { - "0": 8.0, - "1": 8.0, - "2": 6.0, - "3": 6.0 - }, - "GUST": { - "0": None, - "1": None, - "2": None, - "3": None - }, - "HI": { - "0": 105, - "1": 105, - "2": 66, - "3": 66 - }, - "ICON_CODE": { - "0": 28, - "1": 28, - "2": 11, - "3": 11 - }, - "ICON_EXTD": { - "0": 2800, - "1": 2800, - "2": 1201, - "3": 1100 - }, - "MSLP": { - "0": 29.77, - "1": 29.76, - "2": 29.74, - "3": 29.75 - }, - "NUM": { - "0": 1, - "1": 2, - "2": 1, - "3": 2 - }, - "PHRASE_12CHAR": { - "0": "M Cloudy", - "1": "M Cloudy", - "2": "Light Rain", - "3": "Showers" - }, - "PHRASE_22CHAR": { - "0": MOSTLY_CLOUDY, - "1": MOSTLY_CLOUDY, - "2": "Light Rain", - "3": "Showers" - }, - "PHRASE_32CHAR": { - "0": MOSTLY_CLOUDY, - "1": MOSTLY_CLOUDY, - "2": "Light Rain", - "3": "Showers" - }, - "POP": { - "0": "15", - "1": "15", - "2": "86", - "3": "53" - }, - "PRECIP_TYPE": { - "0": "rain", - "1": "rain", - "2": "rain", - "3": "rain" - }, - "QPF": { - "0": 0.0, - "1": 0.0, - "2": 0.02, - "3": 0.01 - }, - "RH": { - "0": 64, - "1": 63, - "2": 85, - "3": 84 - }, - "SEVERITY": { - "0": 1, - "1": 1, - "2": 1, - "3": 1 - }, - "SNOW_QPF": { - "0": 0.0, - "1": 0.0, - "2": 0.0, - "3": 0.0 - }, - "SUBPHRASE_PT1": { - "0": "Mostly", - "1": "Mostly", - "2": "Light", - "3": "Showers" - }, - "SUBPHRASE_PT2": { - "0": "Cloudy", - "1": "Cloudy", - "2": "Rain", - "3": "" - }, - "SUBPHRASE_PT3": { - "0": "", - "1": "", - "2": "", - "3": "" - }, - "TEMP": { - "0": 91, - "1": 91, - "2": 66, - "3": 66 - }, - "UV_DESC": { - "0": "High", - "1": "Moderate", - "2": "Low", - "3": "Low" - }, - "UV_INDEX": { - "0": 6, - "1": 5, - "2": 0, - "3": 0 - }, - "UV_INDEX_RAW": { - "0": 5.65, - "1": 5.08, - "2": 0.35, - "3": 0.09 - }, - "UV_WARNING": { - "0": 0, - "1": 0, - "2": 0, - "3": 0 - }, - "VIS": { - "0": 10.0, - "1": 10.0, - "2": 9.0, - "3": 9.0 - }, - "WC": { - "0": 91, - "1": 91, - "2": 66, - "3": 66 - }, - "WDIR": { - "0": 233, - "1": 235, - "2": 207, - "3": 218 - }, - "WDIR_CARDINAL": { - "0": "SW", - "1": "SW", - "2": "SSW", - "3": "SW" - }, - "WSPD": { - "0": 6, - "1": 5, - "2": 4, - "3": 6 - }, - "WXMAN": { - "0": "wx1230", - "1": "wx1230", - "2": "wx2500", - "3": "wx2500" - } -} +expected_json = {"Latitude": {"0": 32.3667, "1": 32.3667, "2": 51.52, "3": 51.52}, + "Longitude": {"0": -95.4, "1": -95.4, "2": -0.11, "3": -0.11}, + "Class": {"0": "fod_short_range_hourly_1", "1": "fod_short_range_hourly_1", + "2": "fod_short_range_hourly", "3": "fod_short_range_hourly"}, + "ExpireTimeGmt": {"0": 1686945844, "1": 1686945843, "2": 1687110027, "3": 1687110027}, + "FcstValid": {"0": 1686945604, "1": 168694920, "2": 1687111200, "3": 1687114800}, + "FcstValidLocal": {"0": "2023-06-16T18:00:00-0500", "1": "2023-06-16T17:00:00-0500", + "2": "2023-06-18T19:00:00+0100", "3": "2023-06-18T20:00:00+0100"}, + "Num": {"0": 5, "1": 3, "2": 1, "3": 2}, "DayInd": {"0": "E", "1": "C", "2": "D", "3": "D"}, + "Temp": {"0": 81, "1": 92, "2": 66, "3": 66}, "Dewpt": {"0": 67, "1": 72, "2": 62, "3": 61}, + "Hi": {"0": 107, "1": 107, "2": 66, "3": 66}, "Wc": {"0": 95, "1": 92, "2": 66, "3": 66}, + "FeelsLike": {"0": 108, "1": 106, "2": 66, "3": 66}, + "IconExtd": {"0": 2801, "1": 2801, "2": 1201, "3": 1100}, + "Wxman": {"0": "wx1233", "1": "wx1231", "2": "wx2500", "3": "wx2500"}, + "IconCode": {"0": 24, "1": 27, "2": 11, "3": 11}, + "Dow": {"0": "Saturday", "1": "Saturday", "2": "Sunday", "3": "Sunday"}, + "Phrase12Char": {"0": "Cloudy", "1": "B Cloudy", "2": "Light Rain", "3": "Showers"}, + "Phrase22Char": {"0": "Cloudy", "1": "Cloudy", "2": "Light Rain", "3": "Showers"}, + "Phrase32Char": {"0": "Cloudy", "1": "Cloudy", "2": "Light Rain", "3": "Showers"}, + "SubphrasePt1": {"0": "Mostly Cloudy", "1": "Mostly Windy", "2": "Light", "3": "Showers"}, + "SubphrasePt2": {"0": "MostlyCloudy", "1": "Mostly_Cloudy", "2": "Rain", "3": ""}, + "SubphrasePt3": {"0": "1", "1": "Cloudy", "2": "", "3": ""}, + "Pop": {"0": "16", "1": "16", "2": "86", "3": "53"}, + "PrecipType": {"0": "storm", "1": "wind", "2": "rain", "3": "rain"}, + "Qpf": {"0": 0.1, "1": 0.1, "2": 0.02, "3": 0.01}, "SnowQpf": {"0": 0.2, "1": 0.1, "2": 0.0, "3": 0.0}, + "Rh": {"0": 65, "1": 64, "2": 85, "3": 84}, "Wspd": {"0": 8, "1": 6, "2": 4, "3": 6}, + "Wdir": {"0": 235, "1": 236, "2": 207, "3": 218}, + "WdirCardinal": {"0": "SE", "1": "SE", "2": "SSW", "3": "SW"}, + "Gust": {"0": 6.0, "1": 236.0, "2": None, "3": None}, "Clds": {"0": 80, "1": 67, "2": 80, "3": 77}, + "Vis": {"0": 10.1, "1": 10.1, "2": 9.0, "3": 9.0}, + "Mslp": {"0": 29.78, "1": 29.77, "2": 29.74, "3": 29.75}, + "UvIndexRaw": {"0": 5.67, "1": 5.09, "2": 0.35, "3": 0.09}, + "UvIndex": {"0": 8, "1": 6, "2": 0, "3": 0}, "UvWarning": {"0": 1, "1": 1, "2": 0, "3": 0}, + "UvDesc": {"0": "Low", "1": "Medium", "2": "Low", "3": "Low"}, + "GolfIndex": {"0": 9.0, "1": 9.0, "2": 6.0, "3": 6.0}, + "GolfCategory": {"0": "Ok", "1": "Good", "2": "Good", "3": "Good"}, + "Severity": {"0": 2, "1": 2, "2": 1, "3": 1}} def get_api_response(url: str) -> str: if url == "https://api.weather.com/v1/geocode/32.3667/-95.4/forecast/hourly/360hour.json": - return json.dumps(raw_api_response) + return json.dumps(raw_api_response1) else: return json.dumps(raw_api_response2) @@ -446,9 +325,10 @@ def get_response(url: str, params: dict): assert df.count() == 4 assert isinstance(df, DataFrame) - assert str(df.schema) == str(WEATHER_FORECAST_MULTI_SCHEMA) + assert str(df.schema) == str(WEATHER_FORECAST_SCHEMA) pdf = df.toPandas() + expected_df = pd.DataFrame(expected_json) assert str(pdf.to_json()) == str(expected_df.to_json()) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/raw_forecast_to_weather_data_model/input.csv b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/raw_forecast_to_weather_data_model/input.csv new file mode 100755 index 000000000..dd2e06fb9 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/raw_forecast_to_weather_data_model/input.csv @@ -0,0 +1,5 @@ +Latitude,Longitude,Class,ExpireTimeGmt,FcstValid,FcstValidLocal,Num,DayInd,Temp,Dewpt,Hi,Wc,FeelsLike,IconExtd,Wxman,IconCode,Dow,Phrase12Char,Phrase22Char,Phrase32Char,SubphrasePt1,SubphrasePt2,SubphrasePt3,Pop,PrecipType,Qpf,SnowQpf,Rh,Wspd,Wdir,WdirCardinal,Gust,Clds,Vis,Mslp,UvIndexRaw,UvIndex,UvWarning,UvDesc,GolfIndex,GolfCategory,Severity +32.3667,-95.4,fod_short_range_hourly,1688133954,1688133600,2023-06-30T09:00:00-0500,1,D,83,72,89,83,89,3400,wx1030,34,Friday,M Sunny,Mostly Sunny,Mostly Sunny,Mostly,Sunny,,1,rain,0.0,0.0,69,13,216,SW,23.0,26,10.0,30.0,2.15,2,0,Low,9.0,Very Good,1 +32.3667,-95.4,fod_short_range_hourly,1688133954,1688137200,2023-06-30T10:00:00-0500,2,D,86,71,92,86,92,3400,wx1030,34,Friday,M Sunny,Mostly Sunny,Mostly Sunny,Mostly,Sunny,,1,rain,0.0,0.0,62,13,217,SW,21.0,22,10.0,30.0,4.33,4,0,Moderate,8.0,Very Good,1 +32.3667,-95.4,fod_short_range_hourly,1688133954,1688140800,2023-06-30T11:00:00-0500,3,D,88,70,95,88,95,3400,wx1030,34,Friday,M Sunny,Mostly Sunny,Mostly Sunny,Mostly,Sunny,,0,rain,0.0,0.0,56,12,214,SSW,20.0,29,10.0,29.99,6.89,7,0,High,8.0,Very Good,1 +32.3667,-95.4,fod_short_range_hourly,1688133954,1688144400,2023-06-30T12:00:00-0500,4,D,91,71,100,91,100,3400,wx1030,34,Friday,M Sunny,Mostly Sunny,Mostly Sunny,Mostly,Sunny,,0,rain,0.0,0.0,53,11,209,SSW,17.0,23,10.0,29.98,9.25,9,0,Very High,8.0,Very Good,1 diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/raw_forecast_to_weather_data_model/output.csv b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/raw_forecast_to_weather_data_model/output.csv new file mode 100644 index 000000000..4f061df6f --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/raw_forecast_to_weather_data_model/output.csv @@ -0,0 +1,5 @@ +Latitude,Longitude,WeatherDay,WeatherHour,WeatherTimezoneOffset,WeatherType,ProcessedDate,Temperature,DewPoint,Humidity,HeatIndex,WindChill,WindDirection,WindSpeed,CloudCover,WetBulbTemp,SolarIrradiance,Precipitation,DayOrNight,DayOfWeek,WindGust,MslPressure,ForecastDayNum,PropOfPrecip,PrecipType,SnowAccumulation,UvIndex,Visibility +32.3667,-95.4,2023-06-30,10,-0500,F,2023-06-30 14:57:50,83.0,72.0,69.0,89.0,83.0,216.0,13.0,26.0,,,0.0,D,Friday,23,30.0,1,1,rain,0.0,2.0,10.0 +32.3667,-95.4,2023-06-30,11,-0500,F,2023-06-30 14:57:50,86.0,71.0,62.0,92.0,86.0,217.0,13.0,22.0,,,0.0,D,Friday,21,30.0,2,1,rain,0.0,4.0,10.0 +32.3667,-95.4,2023-06-30,12,-0500,F,2023-06-30 14:57:50,88.0,70.0,56.0,95.0,88.0,214.0,12.0,29.0,,,0.0,D,Friday,20,29.99,3,0,rain,0.0,7.0,10.0 +32.3667,-95.4,2023-06-30,13,-0500,F,2023-06-30 14:57:50,91.0,71.0,53.0,100.0,91.0,209.0,11.0,23.0,,,0.0,D,Friday,17,29.98,4,0,rain,0.0,9.0,10.0 \ No newline at end of file diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_raw_forecast_to_weather_data_model.py b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_raw_forecast_to_weather_data_model.py new file mode 100755 index 000000000..5d0abaeaf --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_raw_forecast_to_weather_data_model.py @@ -0,0 +1,35 @@ +import os + +from pyspark.sql.functions import lit, to_timestamp +from src.sdk.python.rtdip_sdk.pipelines.transformers.spark.raw_forecast_to_weather_data_model import \ + RawForecastToWeatherDataModel +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.weather import WEATHER_DATA_MODEL, WEATHER_FORECAST_SCHEMA +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType +from pyspark.sql import SparkSession, DataFrame + +parent_base_path: str = os.path.join(os.path.dirname(os.path.realpath(__file__)), "raw_forecast_to_weather_data_model") + + +def test_raw_forecast_to_weather_data_model(spark_session: SparkSession): + expected_df: DataFrame = spark_session.read.csv(f"{parent_base_path}/output.csv", header=True, + schema=WEATHER_DATA_MODEL) + input_df: DataFrame = spark_session.read.csv(f"{parent_base_path}/input.csv", header=True, + schema=WEATHER_FORECAST_SCHEMA) + + expected_df = spark_session.createDataFrame(expected_df.rdd, schema=WEATHER_DATA_MODEL) + + transformer = RawForecastToWeatherDataModel(spark_session, input_df) + + actual_df = transformer.transform() + actual_df = actual_df.withColumn("ProcessedDate", to_timestamp(lit("2023-06-30 14:57:50"))) + actual_df = spark_session.createDataFrame(actual_df.rdd, schema=WEATHER_DATA_MODEL) + + cols = expected_df.columns + actual_df = actual_df.orderBy(cols) + expected_df = expected_df.orderBy(cols) + + assert transformer.system_type() == SystemType.PYSPARK + assert isinstance(transformer.libraries(), Libraries) + assert transformer.settings() == dict() + assert str(actual_df.schema) == str(expected_df.schema) + assert str(actual_df.collect()) == str(expected_df.collect()) diff --git a/tests/sdk/python/rtdip_sdk/queries/test_time_weighted_average.py b/tests/sdk/python/rtdip_sdk/queries/test_time_weighted_average.py index 9f3881c7a..fdee603fa 100644 --- a/tests/sdk/python/rtdip_sdk/queries/test_time_weighted_average.py +++ b/tests/sdk/python/rtdip_sdk/queries/test_time_weighted_average.py @@ -27,7 +27,8 @@ ACCESS_TOKEN = "mock_databricks_token" DATABRICKS_SQL_CONNECT = 'databricks.sql.connect' DATABRICKS_SQL_CONNECT_CURSOR = 'databricks.sql.connect.cursor' -MOCKED_QUERY= 'WITH raw_events AS (SELECT DISTINCT EventDate, TagName, from_utc_timestamp(to_timestamp(date_format(EventTime, \'yyyy-MM-dd HH:mm:ss.SSS\')), "+0000") as EventTime, Status, Value FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE EventDate BETWEEN date_sub(to_date(to_timestamp("2011-01-01T00:00:00+00:00")), 1) AND date_add(to_date(to_timestamp("2011-01-02T23:59:59+00:00")), 1) AND TagName in (\'MOCKED-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS EventTime, explode(array(\'MOCKED-TAGNAME\')) AS TagName) ,window_events AS (SELECT coalesce(a.TagName, b.TagName) AS TagName, coalesce(a.EventTime, b.EventTime) as EventTime, window(coalesce(a.EventTime, b.EventTime), \'15 minute\').start WindowEventTime, b.Status, b.Value FROM date_array a FULL OUTER JOIN raw_events b ON CAST(a.EventTime AS long) = CAST(b.EventTime AS long) AND a.TagName = b.TagName) ,fill_status AS (SELECT *, last_value(Status, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as Fill_Status, CASE WHEN Fill_Status = "Good" THEN Value ELSE null END AS Good_Value FROM window_events) ,fill_value AS (SELECT *, last_value(Good_Value, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Fill_Value FROM fill_status) ,twa_calculations AS (SELECT TagName, EventTime, WindowEventTime, false AS Step, Status, Value, Fill_Status, Fill_Value, lead(EventTime) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_EventTime, lead(Fill_Status) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_Status ,CASE WHEN Next_Status = "Good" OR (Fill_Status = "Good" AND Next_Status = "Bad") THEN lead(Fill_Value) OVER (PARTITION BY TagName ORDER BY EventTime) ELSE Value END AS Next_Value_For_Status ,CASE WHEN Fill_Status = "Good" THEN Next_Value_For_Status ELSE 0 END AS Next_Value ,CASE WHEN Fill_Status = "Good" and Next_Status = "Good" THEN ((cast(Next_EventTime as double) - cast(EventTime as double)) / 60) WHEN Fill_Status = "Good" and Next_Status != "Good" THEN ((cast(Next_EventTime as integer) - cast(EventTime as double)) / 60) ELSE 0 END AS good_minutes ,CASE WHEN Step == false THEN ((Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (Fill_Value * good_minutes) END AS twa_value FROM fill_value) ,project_result AS (SELECT TagName, WindowEventTime AS EventTime, sum(twa_value) / sum(good_minutes) AS Value from twa_calculations GROUP BY TagName, WindowEventTime) SELECT * FROM project_result WHERE EventTime BETWEEN to_timestamp("2011-01-01T00:00:00") AND to_timestamp("2011-01-02T23:59:59") ORDER BY TagName, EventTime ' +MOCKED_QUERY= 'WITH raw_events AS (SELECT DISTINCT EventDate, TagName, from_utc_timestamp(to_timestamp(date_format(EventTime, \'yyyy-MM-dd HH:mm:ss.SSS\')), "+0000") as EventTime, Status, Value FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE EventDate BETWEEN date_sub(to_date(to_timestamp("2011-01-01T00:00:00+00:00")), 1) AND date_add(to_date(to_timestamp("2011-01-02T23:59:59+00:00")), 1) AND TagName in (\'MOCKED-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS EventTime, explode(array(\'MOCKED-TAGNAME\')) AS TagName) ,window_events AS (SELECT coalesce(a.TagName, b.TagName) AS TagName, coalesce(a.EventTime, b.EventTime) as EventTime, window(coalesce(a.EventTime, b.EventTime), \'15 minute\').start WindowEventTime, b.Status, b.Value FROM date_array a FULL OUTER JOIN raw_events b ON CAST(a.EventTime AS long) = CAST(b.EventTime AS long) AND a.TagName = b.TagName) ,fill_status AS (SELECT *, last_value(Status, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as Fill_Status, CASE WHEN Fill_Status = "Good" THEN Value ELSE null END AS Good_Value FROM window_events) ,fill_value AS (SELECT *, last_value(Good_Value, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Fill_Value FROM fill_status) ,twa_calculations AS (SELECT TagName, EventTime, WindowEventTime, false AS Step, Status, Value, Fill_Status, Fill_Value, lead(EventTime) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_EventTime, lead(Fill_Status) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_Status ,CASE WHEN Next_Status = "Good" OR (Fill_Status = "Good" AND Next_Status = "Bad") THEN lead(Fill_Value) OVER (PARTITION BY TagName ORDER BY EventTime) ELSE Value END AS Next_Value_For_Status ,CASE WHEN Fill_Status = "Good" THEN Next_Value_For_Status ELSE 0 END AS Next_Value ,CASE WHEN Fill_Status = "Good" and Next_Status = "Good" THEN ((cast(Next_EventTime as double) - cast(EventTime as double)) / 60) WHEN Fill_Status = "Good" and Next_Status != "Good" THEN ((cast(Next_EventTime as integer) - cast(EventTime as double)) / 60) ELSE 0 END AS good_minutes ,CASE WHEN Step == false THEN ((Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (Fill_Value * good_minutes) END AS twa_value FROM fill_value) ,project_result AS (SELECT TagName, WindowEventTime AS EventTime, sum(twa_value) / sum(good_minutes) AS Value from twa_calculations GROUP BY TagName, WindowEventTime) SELECT * FROM project_result WHERE EventTime BETWEEN to_timestamp("2011-01-01T00:00:00") AND to_timestamp("2011-01-02T23:59:59") ORDER BY TagName, EventTime ' +METADATA_MOCKED_QUERY = 'WITH raw_events AS (SELECT DISTINCT EventDate, TagName, from_utc_timestamp(to_timestamp(date_format(EventTime, \'yyyy-MM-dd HH:mm:ss.SSS\')), "+0000") as EventTime, Status, Value FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE EventDate BETWEEN date_sub(to_date(to_timestamp("2011-01-01T00:00:00+00:00")), 1) AND date_add(to_date(to_timestamp("2011-01-02T23:59:59+00:00")), 1) AND TagName in (\'MOCKED-TAGNAME\') ) ,meta_data AS (SELECT TagName, IFNULL(Step, false) AS Step FROM `downstream`.`sensors`.`pernis_restricted_metadata` ),date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS EventTime, explode(array(\'MOCKED-TAGNAME\')) AS TagName) ,window_events AS (SELECT coalesce(a.TagName, b.TagName) AS TagName, coalesce(a.EventTime, b.EventTime) as EventTime, window(coalesce(a.EventTime, b.EventTime), \'15 minute\').start WindowEventTime, b.Status, b.Value FROM date_array a FULL OUTER JOIN raw_events b ON CAST(a.EventTime AS long) = CAST(b.EventTime AS long) AND a.TagName = b.TagName) ,fill_status AS (SELECT *, last_value(Status, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as Fill_Status, CASE WHEN Fill_Status = "Good" THEN Value ELSE null END AS Good_Value FROM window_events) ,fill_value AS (SELECT *, last_value(Good_Value, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Fill_Value FROM fill_status) ,twa_calculations AS (SELECT f.TagName, f.EventTime, f.WindowEventTime, m.Step, f.Status, f.Value, f.Fill_Status, f.Fill_Value, lead(f.EventTime) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) AS Next_EventTime, lead(f.Fill_Status) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) AS Next_Status ,CASE WHEN Next_Status = "Good" OR (f.Fill_Status = "Good" AND Next_Status = "Bad") THEN lead(f.Fill_Value) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) ELSE f.Value END AS Next_Value_For_Status ,CASE WHEN f.Fill_Status = "Good" THEN Next_Value_For_Status ELSE 0 END AS Next_Value ,CASE WHEN f.Fill_Status = "Good" and Next_Status = "Good" THEN ((cast(Next_EventTime as double) - cast(f.EventTime as double)) / 60) WHEN f.Fill_Status = "Good" and Next_Status != "Good" THEN ((cast(Next_EventTime as integer) - cast(f.EventTime as double)) / 60) ELSE 0 END AS good_minutes ,CASE WHEN m.Step == false THEN ((f.Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (f.Fill_Value * good_minutes) END AS twa_value FROM fill_value f LEFT JOIN meta_data m ON f.TagName = m.TagName) ,project_result AS (SELECT TagName, WindowEventTime AS EventTime, sum(twa_value) / sum(good_minutes) AS Value from twa_calculations GROUP BY TagName, WindowEventTime) SELECT * FROM project_result WHERE EventTime BETWEEN to_timestamp("2011-01-01T00:00:00") AND to_timestamp("2011-01-02T23:59:59") ORDER BY TagName, EventTime ' MOCKED_PARAMETER_DICT = { "business_unit": "mocked-buiness-unit", "region": "mocked-region", @@ -79,6 +80,25 @@ def test_time_weighted_average_with_window_size_mins(mocker: MockerFixture): mocked_close.assert_called_once() assert isinstance(actual, pd.DataFrame) + +def test_time_weighted_average_metadata_step(mocker: MockerFixture): + MOCKED_PARAMETER_DICT["step"] = "metadata" + mocked_cursor = mocker.spy(MockedDBConnection, "cursor") + mocked_execute = mocker.spy(MockedCursor, "execute") + mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'a': [1], 'b': [2], 'c': [3], 'd': [4]}))) + mocked_close = mocker.spy(MockedCursor, "close") + mocker.patch(DATABRICKS_SQL_CONNECT, return_value = MockedDBConnection()) + + mocked_connection = DatabricksSQLConnection(SERVER_HOSTNAME, HTTP_PATH, ACCESS_TOKEN) + + actual = time_weighted_average_get(mocked_connection, MOCKED_PARAMETER_DICT) + + mocked_cursor.assert_called_once() + mocked_execute.assert_called_once_with(mocker.ANY, query=METADATA_MOCKED_QUERY) + mocked_fetch_all.assert_called_once() + mocked_close.assert_called_once() + assert isinstance(actual, pd.DataFrame) + def test_time_weighted_average_fails(mocker: MockerFixture): mocker.spy(MockedDBConnection, "cursor") mocker.spy(MockedCursor, "execute")