-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
153 lines (126 loc) · 4.08 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import base64
from models import Image
from deta import Base, Drive
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, FileResponse, Response
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"])
pages = Jinja2Templates(directory="templates")
cdn = Base("images")
images = Drive("images")
class NoCacheFileResponse(FileResponse):
def __init__(self, path: str, **kwargs):
super().__init__(path, **kwargs)
self.headers["Cache-Control"] = "no-cache"
@app.get("/", response_class=HTMLResponse)
async def dashboard(request: Request):
res = cdn.fetch()
items = res.items
while res.last:
res = cdn.fetch(last=res.last)
items += res.items
return pages.TemplateResponse(
"dashboard.html",
{"request": request, "items": items},
)
@app.get("/image", response_class=HTMLResponse)
async def image_upload_page(request: Request):
return pages.TemplateResponse("upload.html", {"request": request})
@app.post("/upload")
async def image_upload(request: Request, image: Image):
extension = image.filename.split(".")[-1]
image_data = image.content.split(",")[1].encode("utf-8")
item = cdn.put(
{
# Why non-US spelling? Might come as a suprise to users of the API.
# Just the way it is, not trying to say US is superior. -- LemonPi314
# Don't take the ou out of colour --SlumberDemon
"ext": extension,
"visibility": False,
"embed": [{"title": "", "colour": "000000"}],
},
)
id = item["key"]
images.put(f"{id}.{extension}", base64.b64decode(image_data))
url = f"{request.url.scheme}://{request.url.hostname}/cdn/{id}.{extension}"
return {"image": url, "id": id}
@app.get("/info/{id}", response_class=HTMLResponse)
async def image_info(request: Request, id: str):
info = cdn.get(id)
return pages.TemplateResponse(
"info.html",
{"request": request, "data": info},
)
@app.get("/data/{id}")
async def image_data(id: str):
info = cdn.get(id)
return info
# to deliver static files without caching
# (Done by jnsougata... smh -- LemonPi314)
@app.get("/static/{path:path}")
async def static(path: str):
return NoCacheFileResponse(f"./static/{path}")
@app.patch("/update/{id}")
async def image_update(
id: str,
embed_title: str = None,
embed_colour_hex: str = None,
visibility: bool = None,
):
data = cdn.get(id)
embed_title = (
embed_title if not data["embed"][0]["title"] else data["embed"][0]["title"]
)
embed_colour_hex = (
embed_colour_hex
if not data["embed"][0]["colour"]
else data["embed"][0]["colour"]
)
visibility = visibility if not data["visibility"] else data["visibility"]
cdn.update(
{
"visibility": visibility,
"embed": [{"title": embed_title, "colour": f"{embed_colour_hex}"}],
},
key=id,
)
return {
"id": id,
"visibility": visibility,
"title": embed_title,
"colour": embed_colour_hex,
}
@app.delete("/delete/{id}")
async def image_delete(id: str):
data = cdn.get(id)
images.delete(f"{data['key']}.{data['ext']}")
cdn.delete(id)
return {"id": id}
@app.get("/cdn/{image}")
async def image_cdn(image: str):
img = images.get(image)
info = cdn.get(image.split(".")[0])
if info["visibility"] == True:
item = Response(
img.read(),
media_type=f"image/{image.split('.')[1]}",
)
else:
item = {"error": 404}
return item
@app.get("/embed/{image}")
async def image_cdn_embed(request: Request, image: str):
embed = cdn.get(image.split(".")[0])
return pages.TemplateResponse(
"embed.html",
{"request": request, "embed": embed},
)
@app.get("/assets/{image}")
async def image_assets(image: str):
img = images.get(image)
return Response(
img.read(),
media_type=f"image/{image.split('.')[1]}",
)