Skip to content

Commit

Permalink
core: if 401 on 2nd attempt, avoid anon tokens
Browse files Browse the repository at this point in the history
in the first flow using auth backend for token:
1. try do_request with no auths at all
2. the attempt to gain an anon token is success,
but then the request fails with 401
3. at this point, in the third attempt, give
chance to the flow to request a token but avoid
any anon tokens.

Please note: this happens effectively only on the
first run of the flow. Subsequent do_request flow
invocations should just succeed now on the 1st
request by re-using the token --simplified
behaviour introduced with this proposal

Signed-off-by: tarilabs <[email protected]>
  • Loading branch information
tarilabs committed Jul 6, 2024
1 parent 2dcf321 commit 6e22667
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
19 changes: 11 additions & 8 deletions oras/auth/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def reset_basic_auth(self):
self.set_header("Authorization", "Basic %s" % self._basic_auth)

def authenticate_request(
self, original: requests.Response, headers: dict, refresh=False
self, original: requests.Response, headers: dict, refresh=False, skipAnonToken=False
):
"""
Authenticate Request
Expand Down Expand Up @@ -73,17 +73,20 @@ def authenticate_request(
h = auth_utils.parse_auth_header(authHeaderRaw)

# First try to request an anonymous token
logger.debug("No Authorization, requesting anonymous token")
anon_token = self.request_anonymous_token(h)
if anon_token:
logger.debug("Successfully obtained anonymous token!")
self.token = anon_token
headers["Authorization"] = "Bearer %s" % self.token
return headers, True
if not skipAnonToken:
logger.debug("No Authorization, requesting anonymous token")
anon_token = self.request_anonymous_token(h)
if anon_token:
logger.debug("Successfully obtained anonymous token!")
self.token = anon_token
headers["Authorization"] = "Bearer %s" % self.token
return headers, True

# Next try for logged in token
logger.debug("requesting auth token")
token = self.request_token(h)
if token:
logger.debug("Successfully obtained auth token!")
self.token = token
headers["Authorization"] = "Bearer %s" % self.token
return headers, True
Expand Down
14 changes: 14 additions & 0 deletions oras/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,5 +972,19 @@ def do_request(
stream=stream,
verify=self._tls_verify,
)
# ...or attempt exchange anon token for auth token if 401
if response.status_code == 401:
headers, changed = self.auth.authenticate_request(
response, headers, refresh=True, skipAnonToken=True
)
response = self.session.request(
method,
url,
data=data,
json=json,
headers=headers,
stream=stream,
verify=self._tls_verify,
)

return response

0 comments on commit 6e22667

Please sign in to comment.