-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHandle.py
75 lines (56 loc) · 1.72 KB
/
Handle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from Events import *
from enum import Enum
def defreadcb(reventFd, reventMask) :
print "defreadcb"
def defwritecb(reventFd, reventMask) :
print "defwritecb"
def deferrorcb(reventFd, reventMask) :
print "deferrorcb"
def defclosecb(reventFd, reventMask) :
print "defclosecb"
class Handle(object):
class STATE(Enum) :
ADD = 1
MOD = 2
DEL = 3
LOOP = 4
def __init__(self, events, readcb = defreadcb, writecb = defwritecb, errorcb = deferrorcb, closecb = defclosecb):
assert type(events) == Events
self.events_ = events
self.readcb_ = readcb
self.writecb_ = writecb
self.errorcb_ = errorcb
self.closecb_ = closecb
self.state = Handle.STATE.ADD
def setEvents(self, events) :
assert type(events) == Events
self.events_ = events
def getEvents(self) :
return self.events_
def getEventFd(self) :
return self.events_.fd_
def getEventMask(self) :
return self.events_.eventMask_
def addEventMask(self, eventMask) :
self.events_.addEventMask(eventMask)
def delEventMask(self, eventMask) :
self.events_.delEventMask(eventMask)
def setReadCb(self, readcb) :
self.readcb_ = readcb
def setWriteCb(self, writecb) :
self.writecb_ = writecb
def setErrorCb(self, errorcb) :
self.errorcb_ = errorcb
def setCloseCb(self, closecb) :
self.closecb_ = closecb
def handEvent(self, reventFd, reventMask) :
if (reventMask & EPOLLIN) or (reventMask & EPOLLPRI) or (reventMask & EPOLLHUP) :
self.readcb_(reventFd, reventMask)
elif (reventMask & EPOLLOUT) :
self.writecb_(reventFd, reventMask)
elif (reventMask & POLLERR) :
self.errorcb_(reventFd, reventMask)
elif (reventMask & EPOLLHUP) and (not(reventMask & EPOLLIN)) :
self.closecb_(reventFd, reventMask)
else :
print "handle no event!"