Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic access logging for admins #72

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 80 additions & 4 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,18 +256,22 @@ def serve_logos(path):
#######

from flask_admin.contrib.sqla import ModelView
from app.api.models import Pods, Urls, User, Personalization, Suggestions
from app.utils_db import delete_url_representations, delete_pod_representations, \
rm_from_npz, add_to_npz, create_pod_in_db, create_pod_npz_pos, rm_doc_from_pos, update_db_idvs_after_npz_delete
from app.api.models import AccessLogs, Pods, Urls, User, Personalization, Suggestions
from app.utils_db import (
create_access_log_entry, compute_daily_access_stats, delete_url_representations, delete_pod_representations, rm_from_npz, add_to_npz, create_pod_in_db,
create_pod_npz_pos, rm_doc_from_pos, update_db_idvs_after_npz_delete
)

from flask_admin import expose
from flask_admin.contrib.sqla.view import ModelView
from flask_admin.model.template import EndpointLinkRowAction

# Authentification
class MyLoginManager(LoginManager):
unauthorized_status_code = 404

def unauthorized(self):
return abort(404)
return abort(self.unauthorized_status_code)

login_manager = MyLoginManager()
login_manager.login_view = 'auth.login'
Expand All @@ -281,17 +285,80 @@ def load_user(user_id):
return User.query.get(int(user_id))


# Admin access logs
@app.after_request
def log_endpoint_accessed(response):
if "/admin/" in request.url:
return response

user_logged_in = user_is_confirmed = user_is_admin = False
user_email = None
user_id = -1
if current_user.is_authenticated:
user_id = current_user.id
user_logged_in = True
user_email = current_user.email
user_is_confirmed = current_user.is_confirmed
user_is_admin = current_user.is_admin

if request.endpoint is None:
endpoint = ""
else:
endpoint = request.endpoint
if endpoint in ["static", "serve_sw"]:
event_type = "load_resource"
elif endpoint.startswith("api."):
event_type = "api_request"
elif request.method == "POST":
event_type = "form_submit"
else:
event_type = "view_page"

create_access_log_entry(
user_logged_in,
user_id,
user_is_confirmed,
user_is_admin,
user_email,
event_type,
request.endpoint,
request.url,
response.status_code,
None
)
return response


# Flask and Flask-SQLAlchemy initialization here

from app.auth.decorators import log_auth_failure
def can_access_flaskadmin():
# replaces what the authentication decorators do, for modelview endpoints
if not current_user.is_authenticated:
log_auth_failure(404, "unauthenticated user tried to access admin modelview endpoint")
return abort(404)
if not current_user.is_admin:
"non-admin user tried to access admin modelview endpoint"
return abort(404)
return True

class MyAdminIndexView(AdminIndexView):

@expose()
def index(self):
def _format_stats(stats):
fmt_stats = {}
for stat_name, stat_val in stats.items():
if "_diff" in stat_name:
fmt_stats[stat_name] = f"{'+' if stat_val > 0 else ''}{stat_val}"
else:
fmt_stats[stat_name] = str(stat_val)
return fmt_stats

access_stats = compute_daily_access_stats()

return self.render(self._template, access_stats=_format_stats(access_stats))

def is_accessible(self):
return can_access_flaskadmin()

Expand Down Expand Up @@ -475,11 +542,20 @@ class SuggestionsModelView(ModelView):
def is_accessible(self):
return can_access_flaskadmin()

class AccessLogsModelView(ModelView):
list_template = 'admin/pears_list.html'
column_searchable_list = ['user_id', 'event_type', 'endpoint']
can_edit = False
page_size = 50
def is_accessible(self):
return can_access_flaskadmin()

admin.add_view(PodsModelView(Pods, db.session))
admin.add_view(UrlsModelView(Urls, db.session))
admin.add_view(UsersModelView(User, db.session))
admin.add_view(PersonalizationModelView(Personalization, db.session))
admin.add_view(SuggestionsModelView(Suggestions, db.session))
admin.add_view(AccessLogsModelView(AccessLogs, db.session))

@app.errorhandler(404)
def page_not_found(e):
Expand Down
54 changes: 54 additions & 0 deletions app/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,60 @@ class Base(db.Model):
onupdate=db.func.current_timestamp())


class AccessLogs(db.Model):
id = db.Column(db.Integer, primary_key=True)
log_date = db.Column(db.DateTime, default=db.func.current_timestamp())
user_logged_in = db.Column(db.Boolean)
user_id = db.Column(db.Integer)
user_email = db.Column(db.String(1000))
user_is_confirmed = db.Column(db.Boolean)
user_is_admin = db.Column(db.Boolean)
event_type = db.Column(db.String(1000))
endpoint = db.Column(db.String(1000))
requested_url = db.Column(db.String(1000))
response_code = db.Column(db.Integer)
messages = db.Column(db.String(1000))

def __init__(
self,
logged_in,
user_id,
user_is_confirmed,
user_is_admin,
user_email,
event_type,
endpoint,
requested_url,
response_code,
messages
):
self.user_logged_in = logged_in
self.user_id = user_id
self.user_email = user_email
self.user_is_confirmed = user_is_confirmed
self.user_is_admin = user_is_admin
self.event_type = event_type
self.endpoint = endpoint
self.requested_url = requested_url
self.response_code = response_code
self.messages = messages

@property
def serialize(self):
return {
"id": self.id,
"log_date": self.log_date,
"logged_in": self.logged_in,
"user_id": self.user_id,
"user_email": self.user_email,
"event_type": self.event_type,
"endpoint": self.endpoint,
"requested_url": self.requested_url,
"response_code": self.response_code,
"messages": self.messages
}


class Suggestions(Base):
id = db.Column(db.Integer, primary_key=True)
url = db.Column(db.String(1000))
Expand Down
Loading
Loading