-
This is my validation code: class AuthorizationError(Exception):
""" A base class for exceptions used by bottle. """
pass
def jwt_token_from_header():
auth = request.headers.get('Authorization', None)
if not auth:
raise AuthorizationError({'code': 'authorization_header_missing', 'description': 'Authorization header is expected'})
parts = auth.split()
if parts[0].lower() != 'bearer':
raise AuthorizationError({'code': 'invalid_header', 'description': 'Authorization header must start with Bearer'})
elif len(parts) == 1:
raise AuthorizationError({'code': 'invalid_header', 'description': 'Token not found'})
elif len(parts) > 2:
raise AuthorizationError({'code': 'invalid_header', 'description': 'Authorization header must be Bearer + \s + token'})
return parts[1] and here the function using the validation code (please ignore pieces like pydal rest, just the validating of the JWT is important): @action('api/<tablename>/')
@action('api/<tablename>/<rec_id>')
def api(tablename, rec_id=None):
token = jwt_token_from_header()
if token:
try:
jwt.decode(token, 'secret', algorithms=['HS256'])
return RestAPI(db, policy)(request.method,
tablename,
rec_id,
request.GET,
request.POST
)
except jwt.ExpiredSignatureError:
return ("Token Expired")
except jwt.InvalidSignatureError:
return ("JWT Signature failed!")
else:
return ('Token required!') These are the most relevant imports I think: from pydal.validators import CRYPT
import jwt
from datetime import datetime, timedelta
import json In combination with creating a JWT (other discussion). Thank you! |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments
-
Maybe you can help me here too when you have a moment. That is the last piece of the puzzle. The rest I think I can learn by doing ':) |
Beta Was this translation helpful? Give feedback.
-
I would use a import jwt
from emmett import App, Pipe, request, response
from emmett.tools import ServicePipe
class JWTValidate(Pipe):
def get_header_value(self):
rv, err, header = None, None, request.headers.get("authorization", "")
try:
spec, rv = header.split()
if spec.lower() != "bearer":
err = "Authorization header must start with Bearer"
except:
err = "Authorization header must be Bearer + \s + token"
return rv, err
async def pipe(self, next_pipe, **kwargs):
token, err = self.get_header_value()
if err:
response.code = 400
return {"code": "invalid_header", "description": err}
if not token:
response.code = 400
return {"code": "invalid_header", "description": "Token not found"}
try:
jwt.decode(token, 'secret', algorithms=['HS256'])
except jwt.ExpiredSignatureError:
response.code = 401
return {"code": "not_authorized", "description": "Token expired"}
except jwt.InvalidSignatureError:
response.code = 401
return {"code": "not_authorized", "description": "JWT Signature failed"}
return await next_pipe(**kwargs)
app = App(__name__)
api = app.module(__name__, "api", url_prefix="api")
api.pipeline = [
ServicePipe("json"),
JWTValidate()
] Note: in case you're building REST json APIs, I suggest you to check the REST extension |
Beta Was this translation helpful? Give feedback.
-
Thank you! Wow this really shows how powerful Emmett is. |
Beta Was this translation helpful? Give feedback.
I would use a
Pipe
(see https://emmett.sh/docs/2.2.x/pipeline) and put all the routes requiring such validation within a module (see https://emmett.sh/docs/2.2.x/app_and_modules#application-modules), like: