Skip to content

Commit

Permalink
Garbage collect synchronized items from channels put/get queues.
Browse files Browse the repository at this point in the history
* fibers/conditions.scm (make-counter, %steps-till-gc, counter-decrement!)
(counter-reset!): Moved to new module, counter.scm.
* fibers/counter.scm: New file.  Rename `%steps-till-gc' to
`%countdown-steps'.
* Makefile.am: Add counter.scm.
* fibers/channels.scm (<channel>, make-channel): Add new slots
`getq-gc-counter' and `putq-gc-counter'.
(put-operation, get-operation): Garbage collect synchronized items
from queues.
* fibers/deque.scm (dequeue-filter, dequeue-filter!): New procedures.
  • Loading branch information
cwebber committed Aug 10, 2017
1 parent 932f92c commit 0fa1fd6
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 35 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ SOURCES = \
fibers/channels.scm \
fibers/conditions.scm \
fibers/config.scm \
fibers/counter.scm \
fibers/deque.scm \
fibers/epoll.scm \
fibers/interrupts.scm \
Expand Down
30 changes: 25 additions & 5 deletions fibers/channels.scm
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
;; Channels

;;;; Copyright (C) 2016 Andy Wingo <[email protected]>
;;;; Copyright (C) 2017 Christopher Allan Webber <[email protected]>
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -31,6 +32,7 @@
#:use-module (srfi srfi-9 gnu)
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
#:use-module (fibers counter)
#:use-module (fibers deque)
#:use-module (fibers operations)
#:export (make-channel
Expand All @@ -41,23 +43,27 @@
get-message))

(define-record-type <channel>
(%make-channel getq putq)
(%make-channel getq getq-gc-counter putq putq-gc-counter)
channel?
;; atomic box of deque
(getq channel-getq)
(getq-gc-counter channel-getq-gc-counter)
;; atomic box of deque
(putq channel-putq))
(putq channel-putq)
(putq-gc-counter channel-putq-gc-counter))

(define (make-channel)
"Make a fresh channel."
(%make-channel (make-atomic-box (make-empty-deque))
(make-atomic-box (make-empty-deque))))
(make-counter)
(make-atomic-box (make-empty-deque))
(make-counter)))

(define (put-operation channel message)
"Make an operation that if and when it completes will rendezvous
with a receiver fiber to send @var{message} over @var{channel}."
(match channel
(($ <channel> getq-box putq-box)
(($ <channel> getq-box getq-gc-counter putq-box putq-gc-counter)
(define (try-fn)
;; Try to find and perform a pending get operation. If that
;; works, return a result thunk, or otherwise #f.
Expand Down Expand Up @@ -105,6 +111,13 @@ with a receiver fiber to send @var{message} over @var{channel}."
(not (eq? put-flag get-flag)))))
;; First, publish this put operation.
(enqueue! putq-box (vector put-flag resume-put message))
;; Next, possibly clear off any garbage from queue.
(when (= (counter-decrement! putq-gc-counter) 0)
(dequeue-filter! putq-box
(match-lambda
(#(flag resume)
(not (eq? (atomic-box-ref flag) 'S)))))
(counter-reset! putq-gc-counter))
;; In the try phase, we scanned the getq for a get operation,
;; but we were unable to perform any of them. Since then,
;; there might be a new get operation on the queue. However
Expand Down Expand Up @@ -174,7 +187,7 @@ with a receiver fiber to send @var{message} over @var{channel}."
"Make an operation that if and when it completes will rendezvous
with a sender fiber to receive one value from @var{channel}."
(match channel
(($ <channel> getq-box putq-box)
(($ <channel> getq-box getq-gc-counter putq-box putq-gc-counter)
(define (try-fn)
;; Try to find and perform a pending put operation. If that
;; works, return a result thunk, or otherwise #f.
Expand Down Expand Up @@ -220,6 +233,13 @@ with a sender fiber to receive one value from @var{channel}."
(not (eq? get-flag put-flag)))))
;; First, publish this get operation.
(enqueue! getq-box (vector get-flag resume-get))
;; Next, possibly clear off any garbage from queue.
(when (= (counter-decrement! getq-gc-counter) 0)
(dequeue-filter! getq-box
(match-lambda
(#(flag resume)
(not (eq? (atomic-box-ref flag) 'S)))))
(counter-reset! getq-gc-counter))
;; In the try phase, we scanned the putq for a live put
;; operation, but we were unable to synchronize. Since then,
;; there might be a new operation on the putq. However only
Expand Down
30 changes: 1 addition & 29 deletions fibers/conditions.scm
Original file line number Diff line number Diff line change
Expand Up @@ -33,42 +33,14 @@
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
#:use-module (fibers stack)
#:use-module (fibers counter)
#:use-module (fibers operations)
#:export (make-condition
condition?
signal-condition!
wait-operation
wait))


;;; Counter utilities
;;;
;;; Counters here are an atomic box containing an integer which are
;;; either decremented or reset.

;; How many times we run the block-fn until we gc
(define %steps-till-gc 42) ; haven't tried testing for the most efficient number

(define (make-counter)
(make-atomic-box %steps-till-gc))

(define (counter-decrement! counter)
"Decrement integer in atomic box COUNTER."
(let spin ((x (atomic-box-ref counter)))
(let* ((x-new (1- x))
(x* (atomic-box-compare-and-swap! counter x x-new)))
(if (= x* x) ; successful decrement
x-new
(spin x*)))))

(define (counter-reset! counter)
"Reset a counter's contents."
(atomic-box-set! counter %steps-till-gc))


;;; Conditions


(define-record-type <condition>
(%make-condition signalled? waiters gc-step)
condition?
Expand Down
49 changes: 49 additions & 0 deletions fibers/counter.scm
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
;; Counters

;;;; Copyright (C) 2017 Christopher Allan Webber <[email protected]>
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
;;;; License as published by the Free Software Foundation; either
;;;; version 3 of the License, or (at your option) any later version.
;;;;
;;;; This library is distributed in the hope that it will be useful,
;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;;; Lesser General Public License for more details.
;;;;
;;;; You should have received a copy of the GNU Lesser General Public
;;;; License along with this library; if not, write to the Free Software
;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

;;; General atomic counters; currently used for garbage collection.

(define-module (fibers counter)
#:use-module (ice-9 atomic)
#:export (make-counter
counter-decrement!
counter-reset!))

;;; Counter utilities
;;;
;;; Counters here are an atomic box containing an integer which are
;;; either decremented or reset.

;; How many times we run the block-fn until we gc
(define %countdown-steps 42) ; haven't tried testing for the most efficient number

(define* (make-counter)
(make-atomic-box %countdown-steps))

(define (counter-decrement! counter)
"Decrement integer in atomic box COUNTER."
(let spin ((x (atomic-box-ref counter)))
(let* ((x-new (1- x))
(x* (atomic-box-compare-and-swap! counter x x-new)))
(if (= x* x) ; successful decrement
x-new
(spin x*)))))

(define (counter-reset! counter)
"Reset a counter's contents."
(atomic-box-set! counter %countdown-steps))
16 changes: 15 additions & 1 deletion fibers/deque.scm
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
;; Double-ended queue

;;;; Copyright (C) 2016 Andy Wingo <[email protected]>
;;;; Copyright (C) 2017 Christopher Allan Webber <[email protected]>
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
Expand All @@ -27,10 +28,12 @@
dequeue
dequeue-all
dequeue-match
dequeue-filter
undequeue
dequeue!
dequeue-all!
enqueue!))
enqueue!
dequeue-filter!))

;; A functional double-ended queue ("deque") has a head and a tail,
;; which are both lists. The head is in FIFO order and the tail is in
Expand Down Expand Up @@ -82,6 +85,12 @@
((head . tail)
(make-deque (cons item head) tail))))

(define (dequeue-filter dq pred)
(match dq
((head . tail)
(cons (filter pred head)
(filter pred tail)))))

(define-inlinable (update! box f)
(let spin ((x (atomic-box-ref box)))
(call-with-values (lambda () (f x))
Expand Down Expand Up @@ -110,3 +119,8 @@
(update! dqbox (lambda (dq)
(values (enqueue dq item)
#f))))

(define (dequeue-filter! dqbox pred)
(update! dqbox (lambda (dq)
(values (dequeue-filter dq pred)
#f))))

0 comments on commit 0fa1fd6

Please sign in to comment.