-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
249 additions
and
125 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,44 +1,61 @@ | ||
"""API client for interacting with PandaPWR devices.""" | ||
|
||
import aiohttp | ||
import async_timeout | ||
|
||
HTTP_OK = 200 | ||
|
||
|
||
class PandaPWRApi: | ||
def __init__(self, ip_address: str): | ||
"""API client for PandaPWR devices.""" | ||
|
||
def __init__(self, ip_address: str) -> None: | ||
"""Initialize the API client.""" | ||
self._base_url = f"http://{ip_address}" | ||
self._session = aiohttp.ClientSession() | ||
|
||
async def test_connection(self) -> bool: | ||
"""Test if the connection to the device can be established.""" | ||
try: | ||
async with async_timeout.timeout(10): | ||
async with self._session.get( | ||
f"{self._base_url}/update_ele_data" | ||
) as response: | ||
return response.status == 200 | ||
except Exception: | ||
async with ( | ||
async_timeout.timeout(10), | ||
self._session.get(f"{self._base_url}/update_ele_data") as response, | ||
): | ||
return response.status == HTTP_OK | ||
except aiohttp.ClientError: | ||
return False | ||
|
||
async def get_data(self) -> dict: | ||
"""Fetch data from the device.""" | ||
async with async_timeout.timeout(10): | ||
async with self._session.get( | ||
f"{self._base_url}/update_ele_data" | ||
) as response: | ||
try: | ||
async with ( | ||
async_timeout.timeout(10), | ||
self._session.get(f"{self._base_url}/update_ele_data") as response, | ||
): | ||
return await response.json() | ||
except aiohttp.ClientError: | ||
return {} | ||
|
||
async def set_power_state(self, state: int): | ||
async def set_power_state(self, state: int) -> bool: | ||
"""Set power state (0 for off, 1 for on) using RAW payload.""" | ||
payload = f"power={state}" | ||
async with self._session.post( | ||
f"{self._base_url}/set", data=payload | ||
) as response: | ||
return response.status == 200 | ||
try: | ||
async with ( | ||
async_timeout.timeout(10), | ||
self._session.post(f"{self._base_url}/set", data=payload) as response, | ||
): | ||
return response.status == HTTP_OK | ||
except aiohttp.ClientError: | ||
return False | ||
|
||
async def set_usb_state(self, state: int): | ||
async def set_usb_state(self, state: int) -> bool: | ||
"""Set USB state (0 for off, 1 for on) using RAW payload.""" | ||
payload = f"usb={state}" | ||
async with self._session.post( | ||
f"{self._base_url}/set", data=payload | ||
) as response: | ||
return response.status == 200 | ||
try: | ||
async with ( | ||
async_timeout.timeout(10), | ||
self._session.post(f"{self._base_url}/set", data=payload) as response, | ||
): | ||
return response.status == HTTP_OK | ||
except aiohttp.ClientError: | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.