forked from sillygoose/multisma2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsma.py
140 lines (115 loc) · 4.78 KB
/
sma.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""SMA WebConnect library for Python.
See: http://www.sma.de/en/products/monitoring-control/webconnect.html
Based on: http://www.github.com/kellerza/pysma
"""
import asyncio
import json
import logging
import async_timeout
import jmespath
from aiohttp import client_exceptions
from configuration import APPLICATION_LOG_LOGGER_NAME
logger = logging.getLogger(APPLICATION_LOG_LOGGER_NAME)
USERS = {"user": "usr", "installer": "istl"}
JMESPATH_BASE = "result.*"
JMESPATH_VAL_IDX = '"1"[{}].val'
JMESPATH_VAL = "val"
URL_LOGIN = "/dyn/login.json"
URL_LOGOUT = "/dyn/logout.json"
URL_VALUES = "/dyn/getValues.json"
URL_LOGGER = "/dyn/getLogger.json"
URL_ONLINE = "/dyn/getAllOnlValues.json"
class SMA:
"""Class to connect to the SMA webconnect module and read parameters."""
def __init__(self, session, url, password, group="user", uid=None):
"""Init SMA connection."""
if group not in USERS:
raise KeyError("Invalid user type: {}".format(group))
if len(password) > 12:
logger.warning(f"Password should not exceed 12 characters")
self._new_session_data = {'right': USERS[group], 'pass': password}
self._url = url.rstrip("/")
if not url.startswith("http"):
self._url = "http://" + self._url
self._aio_session = session
self.sma_sid = None
self.sma_uid = uid
async def _fetch_json(self, url, payload):
"""Fetch json data for requests."""
params = {
"data": json.dumps(payload),
"headers": {"content-type": "application/json"},
"params": {"sid": self.sma_sid} if self.sma_sid else None,
}
for _ in range(3):
try:
with async_timeout.timeout(3):
res = await self._aio_session.post(self._url + url, **params)
return (await res.json()) or {}
except (asyncio.TimeoutError, client_exceptions.ClientError):
continue
return {"err": "Could not connect to SMA at {} (timeout)".format(self._url)}
async def _read_body(self, url, payload):
if self.sma_sid is None:
await self.new_session()
if self.sma_sid is None:
logger.error(f"Unable to create new session with inverter {self._url}")
return None
body = await self._fetch_json(url, payload=payload)
# On the first error we close the session which will re-login
err = body.get('err')
if err is not None:
logger.error(
f"{self._url}: error detected, closing session to force another login attempt, got: {body}",
)
await self.close_session()
return None
if not isinstance(body, dict) or "result" not in body:
logger.error("No 'result' in reply from SMA, got: %s", body)
return None
if self.sma_uid is None:
# Get the unique ID
self.sma_uid = next(iter(body["result"].keys()), None)
result_body = body['result'].pop(self.sma_uid, None)
if body != {'result': {}}:
logger.error(f"Unexpected body {json.dumps(body)}, extracted {json.dumps(result_body)}")
return result_body
async def new_session(self):
"""Establish a new session."""
body = await self._fetch_json(URL_LOGIN, self._new_session_data)
self.sma_sid = jmespath.search("result.sid", body)
if self.sma_sid:
return True
err = body.pop('err', None)
msg = "Could not start session, %s, got {}".format(body)
if err:
if err == 503:
logger.error(msg, "Max amount of sessions reached")
else:
logger.error(msg, err)
else:
logger.error(msg, "Session ID expected [result.sid]")
return False
async def close_session(self):
"""Close the session login."""
if self.sma_sid is None:
return
try:
await self._fetch_json(URL_LOGOUT, {})
finally:
self.sma_sid = None
async def read_values(self, keys):
"""Read a list of one or more keys."""
payload = {"destDev": [], "keys": keys}
result_body = await self._read_body(URL_VALUES, payload)
return result_body
async def read_instantaneous(self):
"""One command to read the sensors in the Instantaneous inverter view."""
payload = {"destDev": []}
result_body = await self._read_body(URL_ONLINE, payload)
return result_body
async def read_history(self, start, end):
"""Read the history for the specified period."""
payload = {"destDev": [], "key": 28704, "tStart": start, "tEnd": end}
result_body = await self._read_body(URL_LOGGER, payload)
return result_body