This repository has been archived by the owner on Feb 10, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathProcess.hs
259 lines (221 loc) · 11.3 KB
/
Process.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
---------------------------------------------------------------------------------------------------
---- "Взаимодействующие последовательные процессы", как описано в книге Хоара. ----
---------------------------------------------------------------------------------------------------
-- |
-- Module : Process
-- Copyright : (c) Bulat Ziganshin <[email protected]>
-- License : Public domain
--
-- Maintainer : [email protected]
-- Stability : experimental
-- Portability : GHC
--
-----------------------------------------------------------------------------
module Process where
{-
Процессы соединяются в цепочку операторами "|>" или "|>>>" и запускаются на выполнение функцией runP:
runP( read_files |>>> compress |> write_data )
Процессы исполняются параллельно благодаря тому, что для их запуска используется функция forkOS.
Каждый процесс описывается обычной функцией, которая получает дополнительный параметр типа Pipe.
С этой переменной можно выполнять операцию receiveP для получения данных от предыдущего
процесса в списке, и операцию sendP для посылки данных следующему процессу в списке:
compress pipe = foreverM (do data <- receiveP pipe; .....; sendP pipe compressed_data)
Данные от процесса к процессу передаются "слева направо". В зависимости от использованной
при создании связи между процессами операции - "|>" или "|>>>" - в канал между этими процессами
можно поместить только одно или неограниченное кол-во значений (реализуется с помощью MVar/Chan,
соответственно).
Данные также можно посылать в обратную сторону ("справа налево") операциями send_backP и receive_backP.
Канал обратной связи всегда имеет неограниченную ёмкость. Его можно использовать, например,
для подтверждения выполнения операций, синхронизации, возвращения использованных ресурсов
(например, буферов ввода/вывода):
производитель: sendP pipe (buf,len); receive_backP pipe; теперь буфер свободен
потребитель: (buf,len) <- receiveP pipe; hPutBuf file buf len; send_backP pipe ()
Операция runP выполняется синхронно, она завершается по окончании выполнения последнего процесса
в цепочке (даже если остальные процессы ещё не завершились). Если первый процесс в списке
запускаемых пытается обмениваться с предыдущим (т.е. выполняет операции receiveP/send_backP) или
последний процесс пытается обмениваться со следующим - то сигнализируется ошибка.
Операция runAsyncP запускает процесс или цепочку процессов асинхронно и возвращает Pipe для обмена
с ним(и). В этом случае и первый процесс в цепочке может общаться с "предыдущим", и последний - со
"следующим", хотя это и не обязательно:
pipe <- runAsyncP compress; sendP pipe data; compressed_data <- receiveP pipe
pipe <- runAsyncP( compress |> write_data ); sendP pipe data
pipe <- runAsyncP( read_files |>>> compress ); compressed_data <- receiveP pipe
runAsyncP( read_files |>>> compress |> write_data )
Входная и выходная очереди асинхронно запущенного процесса - (пока) одноэлементные.
-}
import Prelude hiding (catch)
import Control.Concurrent
import Control.OldException
import Control.Monad
import Data.IORef
-- |Операция соединения двух последовательных процессов:
-- выходной канал первого становится входным каналом второго.
-- "|>" создаёт одноэлементную очередь, а "|>>>" - очередь неограниченной длины
infixl 1 |>, |>>>
p1 |> p2 = createP p1 p2 newEmptyMVar
p1 |>>> p2 = createP p1 p2 newChan
createP p1 p2 create_inner (Pipe pid finished income income_back outcome outcome_back) = do
inner <- create_inner -- Канал между p1 и p2 (MVar или Chan)
inner_back <- newChan -- Обратный канал между p1 и p2
p1_finished <- newEmptyMVar -- Признак завершения выполнения p1
-- Запустим первый процесс в отдельном треде, а второй исполним напрямую
p1_id <- forkOS$ (p1 (Pipe pid finished income income_back inner inner_back) >> return ())
`finally` (putMVar p1_finished ())
--
p2 (Pipe (Just p1_id) (Just p1_finished) inner inner_back outcome outcome_back)
takeMVar p1_finished
return ()
-- |Запустить комбинированный процесс, созданный операциями "|>" и "|>>>"
runP p = do
p (Pipe Nothing
Nothing
(error "First process in runP tried to receive")
(error "First process in runP tried to send_back")
(error "Last process in runP tried to send")
(error "Last process in runP tried to receive_back"))
-- |Запустить процесс асинхронно и возвратить канал для обмена с ним
runAsyncP p = do
income <- newEmptyMVar
outcome <- newEmptyMVar
income_back <- newChan
outcome_back <- newChan
parent_id <- myThreadId
p_finished <- newEmptyMVar
p_id <- forkOS (p (Pipe Nothing Nothing income income_back outcome outcome_back)
-- `catch` (\e -> do killThread parent_id; throwIO e)
`finally` putMVar p_finished ())
return (Pipe (Just p_id) (Just p_finished) outcome outcome_back income income_back)
-- Создадим тред и гарантируем его корректное завершение посылкой спец. значения
bracketedRunAsyncP process value =
bracket (runAsyncP process)
(\pipe -> do sendP pipe value; joinP pipe)
-- |Канал обмена с соседними процессами, который получает в своё распоряжение каждый процесс.
-- Канал имеет 6 элементов - ИД предыдущего (запущенного асинхронно) процесса,
-- MVar-переменная, сигнализирующая о его завершении,
-- входные данные, отсылка подтверждений,
-- выходные данные, получение подтверждений
data Pipe a b c d = Pipe (Maybe ThreadId) (Maybe (MVar ())) a b c d
killP pipe@(Pipe (Just pid) _ _ _ _ _) = killThread pid >> joinP pipe
joinP (Pipe _ (Just finished) _ _ _ _) = takeMVar finished
receiveP (Pipe pid finished income income_back outcome outcome_back) = getP income
sendP (Pipe pid finished income income_back outcome outcome_back) = putP outcome
receive_backP (Pipe pid finished income income_back outcome outcome_back) = getP outcome_back
send_backP (Pipe pid finished income income_back outcome outcome_back) = putP income_back
-- |Довольно странная операция - "возвращение" сообщений самому себе - так, как если бы это сделал
-- последующий процесс в очереди. Но она нужна для создания начального пула ресурсов, используемых
-- процессом
send_back_itselfP (Pipe pid finished income income_back outcome outcome_back) = putP outcome_back
-- |Элемент канала между процессами - может иметь тип как MVar, так и Chan
class PipeElement e where
getP :: e a -> IO a
putP :: e a -> a -> IO ()
instance PipeElement MVar where
getP = takeMVar
putP = putMVar
instance PipeElement Chan where
getP = readChan
putP = writeChan
-- |Псевдо-канал процесса - состоит из двух явно заданных функций для получения и посылки данных
data PairFunc a = PairFunc (IO a) (a -> IO ())
instance PipeElement PairFunc where
getP (PairFunc get_f put_f) = get_f
putP (PairFunc get_f put_f) = put_f
-- |Процедура запуска процесса с 4 функциями для эмуляции каналов ввода/вывода
runFuncP p receive_f send_back_f send_f receive_back_f =
p (Pipe Nothing
Nothing
(PairFunc receive_f undefined)
(PairFunc undefined send_back_f)
(PairFunc undefined send_f)
(PairFunc receive_back_f undefined))
-- |Создать pipe, состоящий просто из каналов "туда" и "обратно"
newPipe = do
chan <- newChan -- Канал "туда"
chan_back <- newChan -- Канал "обратно"
return (Pipe Nothing Nothing chan chan_back chan chan_back)
{-# NOINLINE createP #-}
{-# NOINLINE runP #-}
{-# NOINLINE runAsyncP #-}
{-# NOINLINE runFuncP #-}
{-# NOINLINE newPipe #-}
-- Пример использования:
{-
exampleP = do
-- Demonstrates using of "runP"
print "runP: before"
runP( producer 5 |> transformer (++"*2") |> transformer (++"+1") |> printer "runP" )
print "runP: after"
-- Demonstrates using of "runAsyncP" to run computation as parallelly computed function
pipe <- runAsyncP (transformer (++" modified"))
sendP pipe "value"
n <- receiveP pipe
print n
-- Demonstrates using of "runAsyncP" with "|>"
pipe <- runAsyncP( transformer (++"*2") |> transformer (++"+1") )
sendP pipe "7"
n <- receiveP pipe
print n
-- Demonstrates using of "runAsyncP" to run asynchronous process
print "runAsyncP: before"
pipe <- runAsyncP( producer 7 |> printer "runAsyncP" )
print "runAsyncP: after?"
producer n pipe = do
mapM_ (sendP pipe.show) [1..n]
sendP pipe "0"
transformer f pipe = do
n <- receiveP pipe
sendP pipe (f n)
transformer f pipe
printer str pipe = do
n <- receiveP pipe
when (head n/='0')$ do print$ str ++ ": " ++ n
printer str pipe
-}
{- Design principles:
1. Процессы в runP должны запускаться слева направо. При небольшом объёме
обрабатываемых данных это приведёт к тому, что первый процесс в
транспортёре произведёт все необходимые данные и завершится прежде, чем
второй и последующие процессы вообще будут запущены
2. runP должен запустить все процессы в дополнительных тредах и дожидаться
их завершения. Выход из runP желательно производить только после
завершения работы всех процессов
3. При завершении процесса предыдущий процесс в транспортёре должен получить
исключение для того, чтобы завершиться как можно быстрее (после чего
предшествующий ему процесс должен получить исключение в свою очередь).
Следующий же процесс должен получить только информацию о завершении
входных данных при попытке их прочитать (tryReceiveP, eofP)
4. При возникновении необработанного исключения в одном из процессов все
остальные процессы в транспортёре должны быть прекращены (посылкой сигнала
KillThread) и это исключение перевозбуждено в основном процессе
5. runP (p1 |> p2 |> protectP p) защищает процесс `p` от возбуждения в нём исключений,
вместо этого возникающие ситуации только сигнализируются в состоянии канала
6. Для ожидания завершения процесса, запущенного по runAsyncP или
предыдущего процесса в транспортёре ввести операцию joinP pipe
7. "p |> yP p1 p2" посылает вывод одного процесса двум другим
8. killP pipe убивает все процессы из запущенного асинхронно транспортёра
9. Нужны удобные и эффективные средства для создания процессов, имеющих
несколько входных и/или выходных каналов (использовать getP/putP?)
10. new_pipe <- insertOnInputP old_pipe process - вставить новый процесс перед своим входом
new_pipe <- insertOnOutputP old_pipe process - вставить новый процесс после своего выхода
p1 |> p2 --> PChain p1 p2 ?
(p1 |> p2) pipe{ MainThreadId, ref_threads... }
p2_threadId <- forkIO $ (p2 pipe2 >> writeIORef pipe.isEof True - по окончании второго процесса)
`catch` (throwTo MainThreadId)
addToMVar ref_threads p2_threadId
p1 pipe1
p1 |> (p2 |> p3)
forkIO (forkIO p3; p2)
p1
runP p =
p_threadId <- forkIO$ p pipe{ MainThreadId = MyThreadId, ref_threads = newIORef [], ...}
addToMVar ref_threads p_threadId
wait them all `catch` (\e -> mapM killThread ref_threads; throw e)
11. Вокруг запущенного треда - катч, который убивает сына, дожидается его завершения и посылает
полученное исключение отцу
-}
{-New design guidelines:
1. a|>b запускается как "fork a; b"
2. При завершении b дождаться завершения a
3. При возникновении в любом из процессов необработанного сигнала
надо убивать все порцессы в транспортёре и перевозбуждать этот сигнал в основной порграмме
-}