-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
140 lines (108 loc) · 4.55 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import asyncio
import os
import uuid
import ssl
from aiohttp import web
from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate, RTCDataChannel
from dotenv import load_dotenv
from server.helpers.get_log_info import get_log_info
from server.tracks.open_face_process_track import OpenFaceProcessTrack
pcs = set()
ROOT = os.path.dirname(__file__)
async def index(request):
content = open(os.path.join(ROOT, "public/index.html"), "r").read()
return web.Response(content_type="text/html", text=content)
async def offer(request):
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection()
pc_id = "PeerConnection(%s)" % uuid.uuid4()
pcs.add(pc)
log_info = get_log_info(pc_id)
log_info("Created for %s", request.remote)
# Create a data channel
# data_channel = pc.createDataChannel("graphs", negotiated=True, id=0)
# Set up event listeners for the data channel
# @data_channel.on("open")
# def on_data_channel_open():
# print("Data channel is open")
# # data_channel.send("Hello, World!")
#
# @data_channel.on("message")
# def on_data_channel_message(message):
# print(f"Received message: {message}")
#
# @data_channel.on("close")
# def on_datachannel_close():
# print("Data channel is closed")
# player = MediaPlayer(os.path.join(ROOT, "sample.ts"))
# unprocessed_video_capture_track = OpenCVCapture(processed=False)
# pc.addTrack(unprocessed_video_capture_track)
#
# processed_video_capture_track = OpenCVCapture(processed=True)
# pc.addTrack(processed_video_capture_track)
#
# audio_capture_track = AudioCapture()
# pc.addTrack(audio_capture_track)
@pc.on("connectionstatechange")
async def on_connectionstatechange():
log_info("Connection state is %s", pc.connectionState)
if pc.connectionState in ["failed", "closed", "disconnected"]:
await close_peer_connections(None)
elif pc.connectionState == "consent_expired":
await close_peer_connections(None)
# @pc.on('icecandidate')
# async def on_icecandidate(candidate):
# log_info("ICE candidate received", candidate)
# # await request.app['websockets'].send_json({'candidate': candidate})
@pc.on("track")
def on_track(track):
log_info("Track %s received", track.kind)
if track.kind == "video":
local_video = OpenFaceProcessTrack(track, pc)
pc.addTrack(local_video)
@track.on("ended")
async def on_ended():
log_info("Track %s ended", track.kind)
# handle offer
await pc.setRemoteDescription(offer)
# send answer
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return web.json_response({'sdp': pc.localDescription.sdp, 'type': pc.localDescription.type})
# async def ice_candidate(request):
# params = await request.json()
# candidate_param = parse_candidate(params['candidate'])
#
# candidate = RTCIceCandidate(sdpMid=params['sdpMid'],
# sdpMLineIndex=params['sdpMLineIndex'],
# component=candidate_param['component'],
# foundation=candidate_param['foundation'],
# ip=candidate_param['ip'],
# port=candidate_param['port'],
# priority=candidate_param['priority'],
# protocol=candidate_param['protocol'],
# tcpType=candidate_param['tcpType'],
# type=candidate_param['type'],
# )
# for pc in pcs:
# await pc.addIceCandidate(candidate)
# return web.Response()
async def close_peer_connections(app):
# close peer connections
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
pcs.clear()
# Create an SSL context
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain('./server/server.crt', './server/server.key')
if __name__ == "__main__":
load_dotenv()
app = web.Application()
app.on_shutdown.append(close_peer_connections)
# Routes
app.router.add_get("/", index)
app.router.add_post("/offer", offer)
# app.router.add_post('/ice-candidate', ice_candidate)
app.router.add_static('/', path=os.path.join(ROOT, "public"), name='public')
web.run_app(app, access_log=None, port=8443, ssl_context=ssl_context)