-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapp.py
58 lines (49 loc) · 1.99 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import streamlit as st
from streamlit import config
import threading
import time
from persistence import get_latest_leaderboard
from persistence import save_location
from celery_utils import start_celery_worker
from celery_utils import start_celery_beat
from celery_utils import is_celery_beat_working
from celery_utils import are_workers_running
from celery_utils import stop_celery
from formatting import render_leaderboard
from streamlit_js_eval import get_page_location
st.title("Github Issues Leaderboard")
try:
location = get_page_location()
save_location(location)
except TypeError:
pass
results = get_latest_leaderboard()
if not results:
st.warning("No results yet...")
else:
render_leaderboard(results)
with st.sidebar:
with st.expander("⚙️ Celery controls"):
is_beat_working = is_celery_beat_working()
label = "💓 Start Celery Beat!" if not is_beat_working else "💓 Celery Beat Started!"
if st.button(label, disabled=is_beat_working):
thread = threading.Thread(target=start_celery_beat, name="Celery beat control thread")
thread.daemon = True
thread.start()
with st.spinner("Starting Celery Beat..."):
time.sleep(1)
st.experimental_rerun()
are_workers_started = are_workers_running()
label = "🔨Start Celery Workers!" if not are_workers_started else "🔨 Celery Workers Started!"
if st.button(label, disabled=are_workers_started):
thread = threading.Thread(target=start_celery_worker, name="Celery worker control thread")
thread.daemon = True
thread.start()
with st.spinner("Starting Celery Workers..."):
time.sleep(1)
st.experimental_rerun()
if st.button("🛑 Stop celery", disabled=not(is_beat_working or are_workers_started)):
stop_celery()
with st.spinner("Stopping Celery..."):
time.sleep(1)
st.experimental_rerun()