-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathroutes.py
68 lines (50 loc) · 2.31 KB
/
routes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from fastapi import (
APIRouter,
Body,
Request,
Response,
HTTPException,
status,
)
from fastapi.encoders import jsonable_encoder
from typing import List
from models import Book, BookUpdate
router = APIRouter()
@router.post("/", response_description="Create a new book", status_code=status.HTTP_201_CREATED, response_model=Book)
def create_book(request: Request, book: Book = Body(...)):
book = jsonable_encoder(book)
new_book = request.app.database["books"].insert_one(book)
created_book = request.app.database["books"].find_one(
{"_id": new_book.inserted_id}
)
return created_book
@router.get("/", response_description="List all books", response_model=List[Book])
def list_books(request: Request):
books = list(request.app.database["books"].find(limit=100))
return books
@router.get("/{id}", response_description="Get a single book by id", response_model=Book)
def find_book(id: str, request: Request):
if (book := request.app.database["books"].find_one({"_id": id})) is not None:
return book
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Book with ID {id} not found")
@router.put("/{id}", response_description="Update a book", response_model=Book)
def update_book(id: str, request: Request, book: BookUpdate = Body(...)):
book = {k: v for k, v in book.dict().items() if v is not None}
if len(book) >= 1:
update_result = request.app.database["books"].update_one(
{"_id": id}, {"$set": book}
)
if update_result.modified_count == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Book with ID {id} not found")
if (
existing_book := request.app.database["books"].find_one({"_id": id})
) is not None:
return existing_book
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Book with ID {id} not found")
@router.delete("/{id}", response_description="Delete a book")
def delete_book(id: str, request: Request, response: Response):
delete_result = request.app.database["books"].delete_one({"_id": id})
if delete_result.deleted_count == 1:
response.status_code = status.HTTP_204_NO_CONTENT
return response
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Book with ID {id} not found")