forked from Zeroloop/lasso-redis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathredis.threads.lasso
85 lines (68 loc) · 1.75 KB
/
redis.threads.lasso
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
<?lasso
// Store connections in remote threads
! ::redis_pipes->istype
? define redis_pipes => thread {
parent map
public oncreate => ..oncreate
public close => {
with key in .keys
let redis_pipe = .find(#key)
do {
.remove(#key)
#redis_pipe->close
}
}
}
define close_redis_pipes => {
with key in redis_pipes->keys
let redis = redis_pipes->find(#key)
where #redis
do {
redis_pipes->remove(#key)
#redis->close
}
}
// Initiates redis thread and returns read and write sockets
define redis_pipe => type {
data
public id,
public read_pipe,
public write_pipe,
public redis
public oncreate => {
.id = lasso_uniqueid
.redis = givenblock
local(pipe) = .split_thread(.redis)
.write_pipe = #pipe->first
.read_pipe = #pipe->second
// Store FD
redis_pipes->insert(.id = self)
}
public close => {
// Remove FD
redis_pipes->remove(.id)
protect => {
.write('close')
}
}
public read => .read_pipe->readObject
public tryread => .read_pipe->tryReadObject
public write(p::any) => .write_pipe->writeobject(#p)
public split_thread(redis::redis_client)
=> split_thread => {
local(
write_pipe = #1->first,
read_pipe = #1->second
)
while(true) => {
// Write response to thread
#redis->listen(1) => {
#write_pipe->writeobject(#1 || '')
}
// Check for close command
#read_pipe->tryReadObject == 'close'
? #redis->close && abort
}
}
}
?>