Skip to content
This repository has been archived by the owner on Jan 1, 2023. It is now read-only.

Commit

Permalink
Make token refreshes more reliable
Browse files Browse the repository at this point in the history
  • Loading branch information
muffix committed Nov 30, 2021
1 parent 056c14a commit e90ae64
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# v1.0.3

- Increased the frequency of authentication token refreshes

# v1.0.2

- Added support for the `--latest-day` option
Expand Down
59 changes: 32 additions & 27 deletions impf.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ def __init__(self, username: str, password: str, citizen_id: str):
self._password = password
self.citizen_id = citizen_id
self.session = requests.Session()
self._auth_token = None
self._refresh_token = None
self._auth_token: Optional[str] = None
self._auth_token_expiry: Optional[datetime.datetime] = None
self._refresh_token: Optional[str] = None

def __enter__(self):
self.stop_schedule = run_schedule()
Expand All @@ -81,10 +82,7 @@ def reset_session(self):
self._refresh_token = None

def _submit_form(
self,
url: str,
body: Dict[str, str],
allow_redirects: bool = True,
self, url: str, body: Dict[str, str], allow_redirects: bool = True
) -> Response:
"""
Submits a form to the API
Expand Down Expand Up @@ -158,11 +156,7 @@ def _login(self):

login_resp = self._submit_form(
self._get_login_action(),
{
"username": self._user,
"password": self._password,
"credentialId": "",
},
{"username": self._user, "password": self._password, "credentialId": ""},
allow_redirects=False,
)

Expand All @@ -172,13 +166,30 @@ def _login(self):
code = parse_qs(state)["code"][0]
self.refresh_auth_token(code)

# Auth tokens are valid for 300 seconds, so let's refresh ours after 270.
schedule.every(270).seconds.do(self.refresh_auth_token)
# Check every 10 seconds if we need to refresh the auth token
schedule.every(10).seconds.do(self.refresh_auth_token)

@property
def is_auth_token_expired(self) -> bool:
return self._auth_token_expiry and self._auth_token_expiry - datetime.datetime.now() < datetime.timedelta(
seconds=30
)

def refresh_auth_token(self, code: Optional[str] = None):
logging.debug("Refreshing auth token")
"""
Authentication tokens are required for calls to the API. They are only valid for a limited time, but can be
refreshed. This method fetches the latest authentication token if necessary.
Optionally accepts an authorisation code obtained from the login endpoint. If an authentication code is passed,
this method fetches the matching token.
:param code: Authorisation code retrieved from the login endpoint
"""
if not code and not self.is_auth_token_expired:
return

if code:
logging.debug("Getting auth token from auth code")
token_rsp = self._submit_form(
self.TOKEN_URL,
{
Expand All @@ -189,6 +200,7 @@ def refresh_auth_token(self, code: Optional[str] = None):
},
)
else:
logging.debug("Refreshing auth token")
token_rsp = self._submit_form(
self.TOKEN_URL,
{
Expand All @@ -200,8 +212,10 @@ def refresh_auth_token(self, code: Optional[str] = None):
token_rsp.raise_for_status()
rsp_json = token_rsp.json()

self._auth_token, self._refresh_token = (
self._auth_token, self._auth_token_expiry, self._refresh_token = (
rsp_json["access_token"],
datetime.datetime.now()
+ datetime.timedelta(seconds=rsp_json["expires_in"]),
rsp_json["refresh_token"],
)

Expand All @@ -223,11 +237,7 @@ def _find_appointment(
appt_rsp = self.session.get(
url_with_params(
self._appointments_url("/next"),
{
"timeOfDay": "ALL_DAY",
"lastDate": earliest_day,
"lastTime": "00:00",
},
{"timeOfDay": "ALL_DAY", "lastDate": earliest_day, "lastTime": "00:00"},
),
headers=self._headers(with_auth=True),
)
Expand Down Expand Up @@ -297,10 +307,7 @@ def _book(self, payload: Dict[str, Union[str, Dict[str, bool]]]) -> bool:
:return: True if the booking was successful, False otherwise
"""

payload["reminderChannel"] = {
"reminderByEmail": True,
"reminderBySms": True,
}
payload["reminderChannel"] = {"reminderByEmail": True, "reminderBySms": True}

book_rsp = self.session.post(
self._appointments_url(),
Expand Down Expand Up @@ -394,9 +401,7 @@ def main():
args = parser.parse_args()

checker = ImpfChecker(
username=args.email,
password=args.password,
citizen_id=args.citizen_id,
username=args.email, password=args.password, citizen_id=args.citizen_id
)

if args.interval is not None:
Expand Down

0 comments on commit e90ae64

Please sign in to comment.