Skip to content

Commit

Permalink
Merge pull request #206 from praw-dev/more_lint
Browse files Browse the repository at this point in the history
Ruff entire project and pyright against prawcore directory
  • Loading branch information
bboe authored Feb 10, 2025
2 parents 7a8cfef + 7ce9768 commit 0f15dd2
Show file tree
Hide file tree
Showing 27 changed files with 417 additions and 590 deletions.
13 changes: 5 additions & 8 deletions examples/caching_requestor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

"""This example shows how simple in-memory caching can be used.
"""Example program that shows how simple in-memory caching can be used.
Demonstrates the use of custom sessions with :class:`.Requestor`. It's an adaptation of
``read_only_auth_trophies.py``.
Expand Down Expand Up @@ -28,10 +28,9 @@ class CachingSession(requests.Session):
def request(self, method, url, params=None, **kwargs):
"""Perform a request, or return a cached response if available."""
params_key = tuple(params.items()) if params else ()
if method.upper() == "GET":
if (url, params_key) in self.get_cache:
print("Returning cached response for:", method, url, params)
return self.get_cache[(url, params_key)]
if method.upper() == "GET" and (url, params_key) in self.get_cache:
print("Returning cached response for:", method, url, params)
return self.get_cache[(url, params_key)]
result = super().request(method, url, params, **kwargs)
if method.upper() == "GET":
self.get_cache[(url, params_key)] = result
Expand All @@ -45,9 +44,7 @@ def main():
print(f"Usage: {sys.argv[0]} USERNAME")
return 1

caching_requestor = prawcore.Requestor(
"prawcore_device_id_auth_example", session=CachingSession()
)
caching_requestor = prawcore.Requestor("prawcore_device_id_auth_example", session=CachingSession())
authenticator = prawcore.TrustedAuthenticator(
caching_requestor,
os.environ["PRAWCORE_CLIENT_ID"],
Expand Down
2 changes: 1 addition & 1 deletion examples/device_id_auth_trophies.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

"""This example outputs a user's list of trophies.
"""Example program that outputs a user's list of trophies.
This program demonstrates the use of ``prawcore.DeviceIDAuthorizer``.
Expand Down
12 changes: 5 additions & 7 deletions examples/obtain_refresh_token.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

"""This example demonstrates the flow for retrieving a refresh token.
"""Example program that demonstrates the flow for retrieving a refresh token.
In order for this example to work your application's redirect URI must be set to
http://localhost:8080.
Expand Down Expand Up @@ -36,7 +36,7 @@ def receive_connection():
def send_message(client, message):
"""Send message to client and close the connection."""
print(message)
client.send(f"HTTP/1.1 200 OK\r\n\r\n{message}".encode("utf-8"))
client.send(f"HTTP/1.1 200 OK\r\n\r\n{message}".encode())
client.close()


Expand All @@ -53,24 +53,22 @@ def main():
"http://localhost:8080",
)

state = str(random.randint(0, 65000))
state = str(random.randint(0, 65000)) # noqa: S311
url = authenticator.authorize_url("permanent", sys.argv[1:], state)
print(url)

client = receive_connection()
data = client.recv(1024).decode("utf-8")
param_tokens = data.split(" ", 2)[1].split("?", 1)[1].split("&")
params = {
key: value for (key, value) in [token.split("=") for token in param_tokens]
}
params = dict([token.split("=") for token in param_tokens])

if state != params["state"]:
send_message(
client,
f"State mismatch. Expected: {state} Received: {params['state']}",
)
return 1
elif "error" in params:
if "error" in params:
send_message(client, params["error"])
return 1

Expand Down
2 changes: 1 addition & 1 deletion examples/read_only_auth_trophies.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

"""This example outputs a user's list of trophies.
"""Example program that outputs a user's list of trophies.
This program demonstrates the use of ``prawcore.ReadOnlyAuthorizer`` that does not
require an access token to make authenticated requests to Reddit.
Expand Down
77 changes: 29 additions & 48 deletions prawcore/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import time
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Callable
from typing import TYPE_CHECKING, Any, Callable, cast

from requests import Request
from requests.status_codes import codes
Expand All @@ -30,7 +30,7 @@ def __init__(
requestor: Requestor,
client_id: str,
redirect_uri: str | None = None,
):
) -> None:
"""Represent a single authentication to Reddit's API.
:param requestor: An instance of :class:`.Requestor`.
Expand All @@ -46,23 +46,19 @@ def __init__(
self.client_id = client_id
self.redirect_uri = redirect_uri

def _post(
self, url: str, success_status: int = codes["ok"], **data: Any
) -> Response:
def _post(self, *, url: str, **data: Any) -> Response:
response = self._requestor.request(
"post",
url,
auth=self._auth(),
data=sorted(data.items()),
headers={"Connection": "close"},
)
if response.status_code != success_status:
if response.status_code != codes["ok"]:
raise ResponseException(response)
return response

def authorize_url(
self, duration: str, scopes: list[str], state: str, implicit: bool = False
) -> str:
def authorize_url(self, duration: str, scopes: list[str], state: str, implicit: bool = False) -> str:
"""Return the URL used out-of-band to grant access to your application.
:param duration: Either ``"permanent"`` or ``"temporary"``. ``"temporary"``
Expand Down Expand Up @@ -90,9 +86,7 @@ def authorize_url(
msg = "redirect URI not provided"
raise InvalidInvocation(msg)
if implicit and not isinstance(self, UntrustedAuthenticator):
msg = (
"Only UntrustedAuthenticator instances can use the implicit grant flow."
)
msg = "Only UntrustedAuthenticator instances can use the implicit grant flow."
raise InvalidInvocation(msg)
if implicit and duration != "temporary":
msg = "The implicit grant flow only supports temporary access tokens."
Expand All @@ -108,9 +102,9 @@ def authorize_url(
}
url = self._requestor.reddit_url + const.AUTHORIZATION_PATH
request = Request("GET", url, params=params)
return request.prepare().url
return cast(str, request.prepare().url)

def revoke_token(self, token: str, token_type: str | None = None):
def revoke_token(self, token: str, token_type: str | None = None) -> None:
"""Ask Reddit to revoke the provided token.
:param token: The access or refresh token to revoke.
Expand All @@ -123,15 +117,15 @@ def revoke_token(self, token: str, token_type: str | None = None):
if token_type is not None:
data["token_type_hint"] = token_type
url = self._requestor.reddit_url + const.REVOKE_TOKEN_PATH
self._post(url, **data)
self._post(url=url, **data)


class BaseAuthorizer:
"""Superclass for OAuth2 authorization tokens and scopes."""

AUTHENTICATOR_CLASS: tuple | type = BaseAuthenticator

def __init__(self, authenticator: BaseAuthenticator):
def __init__(self, authenticator: BaseAuthenticator) -> None:
"""Represent a single authorization to Reddit's API.
:param authenticator: An instance of :class:`.BaseAuthenticator`.
Expand All @@ -152,13 +146,9 @@ def _request_token(self, **data: Any):
response = self._authenticator._post(url=url, **data)
payload = response.json()
if "error" in payload: # Why are these OKAY responses?
raise OAuthException(
response, payload["error"], payload.get("error_description")
)
raise OAuthException(response, payload["error"], payload.get("error_description"))

self._expiration_timestamp_ns = (
pre_request_timestamp_ns + (payload["expires_in"] + 10) * const.NANOSECONDS
)
self._expiration_timestamp_ns = pre_request_timestamp_ns + (payload["expires_in"] + 10) * const.NANOSECONDS
self.access_token = payload["access_token"]
if "refresh_token" in payload:
self.refresh_token = payload["refresh_token"]
Expand All @@ -170,9 +160,7 @@ def _validate_authenticator(self):
if isinstance(self.AUTHENTICATOR_CLASS, type):
msg += f" {self.AUTHENTICATOR_CLASS.__name__}."
else:
msg += (
f" {' or '.join([i.__name__ for i in self.AUTHENTICATOR_CLASS])}."
)
msg += f" {' or '.join([i.__name__ for i in self.AUTHENTICATOR_CLASS])}."
raise InvalidInvocation(msg)

def is_valid(self) -> bool:
Expand All @@ -182,12 +170,9 @@ def is_valid(self) -> bool:
valid on the server side.
"""
return (
self.access_token is not None
and time.monotonic_ns() < self._expiration_timestamp_ns
)
return self.access_token is not None and time.monotonic_ns() < self._expiration_timestamp_ns

def revoke(self):
def revoke(self) -> None:
"""Revoke the current Authorization."""
if self.access_token is None:
msg = "no token available to revoke"
Expand All @@ -208,7 +193,7 @@ def __init__(
client_id: str,
client_secret: str,
redirect_uri: str | None = None,
):
) -> None:
"""Represent a single authentication to Reddit's API.
:param requestor: An instance of :class:`.Requestor`.
Expand Down Expand Up @@ -245,7 +230,7 @@ def __init__(
post_refresh_callback: Callable[[Authorizer], None] | None = None,
pre_refresh_callback: Callable[[Authorizer], None] | None = None,
refresh_token: str | None = None,
):
) -> None:
"""Represent a single authorization to Reddit's API.
:param authenticator: An instance of a subclass of :class:`.BaseAuthenticator`.
Expand All @@ -267,7 +252,7 @@ def __init__(
self._pre_refresh_callback = pre_refresh_callback
self.refresh_token = refresh_token

def authorize(self, code: str):
def authorize(self, code: str) -> None:
"""Obtain and set authorization tokens based on ``code``.
:param code: The code obtained by an out-of-band authorization request to
Expand All @@ -283,20 +268,18 @@ def authorize(self, code: str):
redirect_uri=self._authenticator.redirect_uri,
)

def refresh(self):
def refresh(self) -> None:
"""Obtain a new access token from the refresh_token."""
if self._pre_refresh_callback:
self._pre_refresh_callback(self)
if self.refresh_token is None:
msg = "refresh token not provided"
raise InvalidInvocation(msg)
self._request_token(
grant_type="refresh_token", refresh_token=self.refresh_token
)
self._request_token(grant_type="refresh_token", refresh_token=self.refresh_token)
if self._post_refresh_callback:
self._post_refresh_callback(self)

def revoke(self, only_access: bool = False):
def revoke(self, only_access: bool = False) -> None:
"""Revoke the current Authorization.
:param only_access: When explicitly set to ``True``, do not evict the refresh
Expand Down Expand Up @@ -325,7 +308,7 @@ def __init__(
access_token: str,
expires_in: int,
scope: str,
):
) -> None:
"""Represent a single implicit authorization to Reddit's API.
:param authenticator: An instance of :class:`.UntrustedAuthenticator`.
Expand All @@ -341,9 +324,7 @@ def __init__(
"""
super().__init__(authenticator)
self._expiration_timestamp_ns = (
time.monotonic_ns() + expires_in * const.NANOSECONDS
)
self._expiration_timestamp_ns = time.monotonic_ns() + expires_in * const.NANOSECONDS
self.access_token = access_token
self.scopes = set(scope.split(" "))

Expand All @@ -362,7 +343,7 @@ def __init__(
self,
authenticator: BaseAuthenticator,
scopes: list[str] | None = None,
):
) -> None:
"""Represent a ReadOnly authorization to Reddit's API.
:param scopes: A list of OAuth scopes to request authorization for (default:
Expand All @@ -372,7 +353,7 @@ def __init__(
super().__init__(authenticator)
self._scopes = scopes

def refresh(self):
def refresh(self) -> None:
"""Obtain a new ReadOnly access token."""
additional_kwargs = {}
if self._scopes:
Expand All @@ -397,7 +378,7 @@ def __init__(
password: str | None,
two_factor_callback: Callable | None = None,
scopes: list[str] | None = None,
):
) -> None:
"""Represent a single personal-use authorization to Reddit's API.
:param authenticator: An instance of :class:`.TrustedAuthenticator`.
Expand All @@ -416,7 +397,7 @@ def __init__(
self._two_factor_callback = two_factor_callback
self._username = username

def refresh(self):
def refresh(self) -> None:
"""Obtain a new personal-use script type access token."""
additional_kwargs = {}
if self._scopes:
Expand Down Expand Up @@ -447,7 +428,7 @@ def __init__(
authenticator: BaseAuthenticator,
device_id: str | None = None,
scopes: list[str] | None = None,
):
) -> None:
"""Represent an app-only OAuth2 authorization for 'installed' apps.
:param authenticator: An instance of :class:`.UntrustedAuthenticator` or
Expand All @@ -466,7 +447,7 @@ def __init__(
self._device_id = device_id
self._scopes = scopes

def refresh(self):
def refresh(self) -> None:
"""Obtain a new access token."""
additional_kwargs = {}
if self._scopes:
Expand Down
Loading

0 comments on commit 0f15dd2

Please sign in to comment.