From 5ca9772a719378f559769bf01f4c30430e24e56c Mon Sep 17 00:00:00 2001 From: glyh Date: Sat, 6 Apr 2024 21:09:15 +0800 Subject: [PATCH 01/52] stick to style --- stdlib/draft-synch.ysh | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 0120c3657c..4d6c6d4f13 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -2,9 +2,9 @@ module stdlib/synch || return 0 -############################ -### FIFO File Desriptors ### -############################ +# +# FIFO File Desriptors +# proc fifo-fd-new(; out_fd) { # WARN: this section should be critical but for now it's not @@ -24,9 +24,9 @@ proc fifo-fd-destroy(; fd) { rm $fifoFile } -################# -### Semaphore ### -################# +# +# Semaphore +# proc sema-new(; value, out_sema) { fifo-fd-new (&sema) From a10aae3f5053b2117ab07b611935cea6fbd890fb Mon Sep 17 00:00:00 2001 From: glyh Date: Sat, 6 Apr 2024 21:09:48 +0800 Subject: [PATCH 02/52] reduce test delay, write more comprehensive test note --- spec/ysh-stdlib-synch.test.sh | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh index a625b681e7..6efe5ce9d6 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-synch.test.sh @@ -2,29 +2,29 @@ ## our_shell: ysh -#### semaphore +#### semaphore syncrhonizing async jobs source --builtin draft-synch.ysh sema-new (1, &s) fork { - sleep 0.5 + sleep 0.2 sema-down (s) echo 1 } fork { - sleep 1 + sleep 0.4 sema-down (s) echo 2 } fork { - sleep 1.5 + sleep 0.6 sema-down (s) echo 3 } -sleep 2 +sleep 0.8 echo 4 sema-up (s) -sleep 0.5 +sleep 0.2 echo 5 sema-up (s) sema-destroy (s) @@ -36,12 +36,12 @@ sema-destroy (s) 3 ## END -#### semaphore init and multiple down +#### semaphore init with 3, async up once and multiple down source --builtin draft-synch.ysh sema-new (3, &s) fork { - sleep 1 + sleep 0.2 sema-up (s) } sema-down (s) From b04d385ca1f781af26e830107ae49b2122153727 Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 06:03:26 +0800 Subject: [PATCH 03/52] don't close a pipe twice --- stdlib/draft-synch.ysh | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 4d6c6d4f13..9788070d22 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -18,10 +18,13 @@ proc fifo-fd-new(; out_fd) { } proc fifo-fd-destroy(; fd) { - var fifoFile = $(readlink /proc/$$/fd/$fd) - exec {fd}>&- - exec {fd}<&- - rm $fifoFile + var fdlink = /proc/$$/fd/$fd + if test -L $fdlink { + var fifoFile = $(readlink $fdlink) + # NOTE: No need to close both end of a pipe. + exec {fd}>&- + rm $fifoFile + } } # From 22abdc3094e26e5c4d62015c91178714adc06240 Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 06:11:40 +0800 Subject: [PATCH 04/52] fix syntax erro --- stdlib/draft-synch.ysh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 9788070d22..1ff1ac213f 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -18,7 +18,7 @@ proc fifo-fd-new(; out_fd) { } proc fifo-fd-destroy(; fd) { - var fdlink = /proc/$$/fd/$fd + var fdlink = "/proc/$$/fd/$fd" if test -L $fdlink { var fifoFile = $(readlink $fdlink) # NOTE: No need to close both end of a pipe. From 8aa830faeb7336b7aa7a3facd834bd5aca29899f Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 06:29:28 +0800 Subject: [PATCH 05/52] should error with double destroy --- spec/ysh-stdlib-synch.test.sh | 11 +++++++++++ stdlib/draft-synch.ysh | 3 +++ 2 files changed, 14 insertions(+) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh index 6efe5ce9d6..08e95a0e92 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-synch.test.sh @@ -2,6 +2,17 @@ ## our_shell: ysh +#### fifo pipe double closes +fifo-fd-new (&fd) +try fifo-fd-destroy (fd) +echo $_status +try fifo-fd-destroy (fd) +echo $_status +## STDOUT: +0 +1 +## END + #### semaphore syncrhonizing async jobs source --builtin draft-synch.ysh diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 1ff1ac213f..cba3bf52c9 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -24,6 +24,9 @@ proc fifo-fd-destroy(; fd) { # NOTE: No need to close both end of a pipe. exec {fd}>&- rm $fifoFile + } else { + echo "Double Destroying the fd $fd twice!" 1>&2 + exit 1 } } From dfb8819c94091a4c2ad0d809432acaaf7d612a58 Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 06:38:36 +0800 Subject: [PATCH 06/52] fix: source synch.ysh --- spec/ysh-stdlib-synch.test.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh index 08e95a0e92..e0c5b16f9d 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-synch.test.sh @@ -3,6 +3,8 @@ ## our_shell: ysh #### fifo pipe double closes +source --builtin draft-synch.ysh + fifo-fd-new (&fd) try fifo-fd-destroy (fd) echo $_status From 07135a8fdacab4227d73343a5241c6d22a01a20d Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 06:44:28 +0800 Subject: [PATCH 07/52] fix: try should wrap inner functions --- spec/ysh-stdlib-synch.test.sh | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh index e0c5b16f9d..395505c45c 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-synch.test.sh @@ -6,9 +6,13 @@ source --builtin draft-synch.ysh fifo-fd-new (&fd) -try fifo-fd-destroy (fd) +try { + fifo-fd-destroy (fd) +} echo $_status -try fifo-fd-destroy (fd) +try { + fifo-fd-destroy (fd) +} echo $_status ## STDOUT: 0 From e52c1cd76594485da91f7302560bd99bcb119865 Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 07:04:14 +0800 Subject: [PATCH 08/52] fix testcase --- spec/ysh-stdlib-synch.test.sh | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh index 395505c45c..ac5e777466 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-synch.test.sh @@ -10,13 +10,10 @@ try { fifo-fd-destroy (fd) } echo $_status -try { - fifo-fd-destroy (fd) -} -echo $_status +fifo-fd-destroy (fd) +## status: 1 ## STDOUT: 0 -1 ## END #### semaphore syncrhonizing async jobs From 0f4640e97bb08a5b7d90adb3b8d26e9bcf724714 Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 07:52:36 +0800 Subject: [PATCH 09/52] fix typos --- stdlib/draft-synch.ysh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index cba3bf52c9..c61e724e30 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -3,7 +3,7 @@ module stdlib/synch || return 0 # -# FIFO File Desriptors +# FIFO File Descriptors # proc fifo-fd-new(; out_fd) { @@ -31,7 +31,7 @@ proc fifo-fd-destroy(; fd) { } # -# Semaphore +# Semaphores # proc sema-new(; value, out_sema) { From de8f0ae4217657872b24ff54100347ace0a480a9 Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 09:12:40 +0800 Subject: [PATCH 10/52] implement channel --- spec/ysh-stdlib-synch.test.sh | 24 +++++++++++++++++- stdlib/draft-synch.ysh | 47 +++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh index ac5e777466..74cea8fd50 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-synch.test.sh @@ -67,4 +67,26 @@ echo yes yes ## END -# TODO: add test case for mutex and other sync primitives +#### channel reads and writes +source --builtin draft-synch.ysh + +channel-new (&ch) + +for i in (0..4) { + fork { + for j in (0..4) { + echo $j | channel-pipe-in (ch) + } + } +} + +var sum = 0 +for i in (0..16) { + var cur = $(channel-pipe-out (ch)) => int() + setvar sum += cur +} + +echo $sum +## STDOUT: +24 +## END diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index c61e724e30..b309efcaf0 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -55,3 +55,50 @@ proc sema-up(; sema, delta = 1) { proc sema-destroy(; sema) { fifo-fd-destroy (sema) } + +# +# Channel +# - Pipeline but you can send multiple objects across pipelines +# - Has to manually ensure the number of sent/received blocks are same across each side. + +proc channel-new(; out_chan) { + sema-new (1, &write_lock) + sema-new (1, &read_lock) + fifo-fd-new (&pipe) + call out_chan->setValue({ + write_lock, + read_lock, + pipe, + }) +} + +const ZERO_CHAR = u'\u{0}' +proc channel-pipe-in (; chan) { + var data = $(cat) # consume everything from input + var data_len = len(data) + sema-down (chan.write_lock) + write -n ${data_len}${ZERO_CHAR}${data} >&$[chan.pipe] + sema-up (chan.write_lock) +} + +proc channel-pipe-out (; chan) { + var data_len = "" + sema-down (chan.read_lock) + while (true) { + read -n 1 next_char <&$[chan.pipe] + if (next_char !== ZERO_CHAR) { + setvar data_len = data_len ++ next_char + } else { + break + } + } + read -n $data_len chunk <&$[chan.pipe] + write -n $chunk + sema-up (chan.read_lock) +} + +proc channel-destroy(; chan) { + fifo-fd-destroy (chan.pipe) + sema-destroy (chan.read_lock) + sema-destroy (chan.write_lock) +} From 008daadfa07c76ad553b038aaf61c42dd1be802d Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 09:13:50 +0800 Subject: [PATCH 11/52] note on blocking --- stdlib/draft-synch.ysh | 1 + 1 file changed, 1 insertion(+) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index b309efcaf0..1ab16943cc 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -60,6 +60,7 @@ proc sema-destroy(; sema) { # Channel # - Pipeline but you can send multiple objects across pipelines # - Has to manually ensure the number of sent/received blocks are same across each side. +# - Blocks, no buffering proc channel-new(; out_chan) { sema-new (1, &write_lock) From b203299e1636250fe1f7d0d977ae955285d9797e Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 09:15:56 +0800 Subject: [PATCH 12/52] A shorter name for channel functions --- spec/ysh-stdlib-synch.test.sh | 4 ++-- stdlib/draft-synch.ysh | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh index 74cea8fd50..2025f742f3 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-synch.test.sh @@ -75,14 +75,14 @@ channel-new (&ch) for i in (0..4) { fork { for j in (0..4) { - echo $j | channel-pipe-in (ch) + echo $j | channel-in (ch) } } } var sum = 0 for i in (0..16) { - var cur = $(channel-pipe-out (ch)) => int() + var cur = $(channel-out (ch)) => int() setvar sum += cur } diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 1ab16943cc..6ee05d9114 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -57,7 +57,7 @@ proc sema-destroy(; sema) { } # -# Channel +# Channels # - Pipeline but you can send multiple objects across pipelines # - Has to manually ensure the number of sent/received blocks are same across each side. # - Blocks, no buffering @@ -74,7 +74,7 @@ proc channel-new(; out_chan) { } const ZERO_CHAR = u'\u{0}' -proc channel-pipe-in (; chan) { +proc channel-in (; chan) { var data = $(cat) # consume everything from input var data_len = len(data) sema-down (chan.write_lock) @@ -82,7 +82,7 @@ proc channel-pipe-in (; chan) { sema-up (chan.write_lock) } -proc channel-pipe-out (; chan) { +proc channel-out (; chan) { var data_len = "" sema-down (chan.read_lock) while (true) { From 92977808ff4021f45e856877278ada421930f771 Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 09:20:22 +0800 Subject: [PATCH 13/52] remove redundant space --- stdlib/draft-synch.ysh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 6ee05d9114..5de8f98995 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -74,7 +74,7 @@ proc channel-new(; out_chan) { } const ZERO_CHAR = u'\u{0}' -proc channel-in (; chan) { +proc channel-in(; chan) { var data = $(cat) # consume everything from input var data_len = len(data) sema-down (chan.write_lock) @@ -82,7 +82,7 @@ proc channel-in (; chan) { sema-up (chan.write_lock) } -proc channel-out (; chan) { +proc channel-out(; chan) { var data_len = "" sema-down (chan.read_lock) while (true) { From 54843790b484bca0fe1057a4634357a991c8d2f8 Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 09:25:41 +0800 Subject: [PATCH 14/52] implement functions to test whether fd is opened --- stdlib/draft-synch.ysh | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 5de8f98995..8909740829 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -17,10 +17,19 @@ proc fifo-fd-new(; out_fd) { call out_fd->setValue(fd) } -proc fifo-fd-destroy(; fd) { +# Tests whether the fd is opened +func fifo-fd-test(; fd) { var fdlink = "/proc/$$/fd/$fd" if test -L $fdlink { - var fifoFile = $(readlink $fdlink) + return (true) + } else { + return (false) + } +} + +proc fifo-fd-destroy(; fd) { + if (fifo-fd-test(fd)) { + var fifoFile = $(readlink "/proc/$$/fd/$fd") # NOTE: No need to close both end of a pipe. exec {fd}>&- rm $fifoFile From bc8f63c9552b56c77af3f461d571ff7b261ebe5c Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 09:38:08 +0800 Subject: [PATCH 15/52] abstract out mutex from channel --- stdlib/draft-synch.ysh | 39 ++++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 8909740829..b38878dea8 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -65,15 +65,37 @@ proc sema-destroy(; sema) { fifo-fd-destroy (sema) } +# +# Mutex +# + +proc mutex-new(; out_mutex) { + sema-new (1, out_mutex) +} + +proc mutex-acquire(; mutex) { + sema-down (mutex) +} + +proc mutex-release(; mutex) { + sema-up (mutex) +} + +proc mutex-destroy(; mutex) { + sema-destroy (mutex) +} + # # Channels # - Pipeline but you can send multiple objects across pipelines # - Has to manually ensure the number of sent/received blocks are same across each side. # - Blocks, no buffering +const ZERO_CHAR = u'\u{0}' + proc channel-new(; out_chan) { - sema-new (1, &write_lock) - sema-new (1, &read_lock) + mutex-new (&write_lock) + mutex-new (&read_lock) fifo-fd-new (&pipe) call out_chan->setValue({ write_lock, @@ -82,18 +104,17 @@ proc channel-new(; out_chan) { }) } -const ZERO_CHAR = u'\u{0}' proc channel-in(; chan) { var data = $(cat) # consume everything from input var data_len = len(data) - sema-down (chan.write_lock) + mutex-acquire (chan.write_lock) write -n ${data_len}${ZERO_CHAR}${data} >&$[chan.pipe] - sema-up (chan.write_lock) + mutex-release (chan.write_lock) } proc channel-out(; chan) { var data_len = "" - sema-down (chan.read_lock) + mutex-acquire (chan.read_lock) while (true) { read -n 1 next_char <&$[chan.pipe] if (next_char !== ZERO_CHAR) { @@ -104,11 +125,11 @@ proc channel-out(; chan) { } read -n $data_len chunk <&$[chan.pipe] write -n $chunk - sema-up (chan.read_lock) + mutex-release (chan.read_lock) } proc channel-destroy(; chan) { fifo-fd-destroy (chan.pipe) - sema-destroy (chan.read_lock) - sema-destroy (chan.write_lock) + mutex-destroy (chan.read_lock) + mutex-destroy (chan.write_lock) } From 3b079213af33b7229b46fba7e5bb811dd540b8ab Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 09:48:40 +0800 Subject: [PATCH 16/52] fix func signature --- stdlib/draft-synch.ysh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index b38878dea8..e126233483 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -18,7 +18,7 @@ proc fifo-fd-new(; out_fd) { } # Tests whether the fd is opened -func fifo-fd-test(; fd) { +func fifoFdTest(fd) { var fdlink = "/proc/$$/fd/$fd" if test -L $fdlink { return (true) @@ -28,7 +28,7 @@ func fifo-fd-test(; fd) { } proc fifo-fd-destroy(; fd) { - if (fifo-fd-test(fd)) { + if (fifoFdTest(fd)) { var fifoFile = $(readlink "/proc/$$/fd/$fd") # NOTE: No need to close both end of a pipe. exec {fd}>&- From 3d4576f734d003a82f7d05a93628c9d202a7bc23 Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 10:42:29 +0800 Subject: [PATCH 17/52] implement rwlock, backed by flock --- stdlib/draft-synch.ysh | 88 +++++++++++++++++++++++++++++++++++------- 1 file changed, 73 insertions(+), 15 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index e126233483..1051415041 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -3,22 +3,11 @@ module stdlib/synch || return 0 # -# FIFO File Descriptors +# General utilities for fds # -proc fifo-fd-new(; out_fd) { - # WARN: this section should be critical but for now it's not - # A solution may be retry on fail. - #==================== - var fifo = $(mktemp -u) - mkfifo $fifo - #==================== - exec {fd}<>$fifo - call out_fd->setValue(fd) -} - # Tests whether the fd is opened -func fifoFdTest(fd) { +func fdTest(fd) { var fdlink = "/proc/$$/fd/$fd" if test -L $fdlink { return (true) @@ -27,8 +16,8 @@ func fifoFdTest(fd) { } } -proc fifo-fd-destroy(; fd) { - if (fifoFdTest(fd)) { +proc fd-destroy(; fd) { + if (fdTest(fd)) { var fifoFile = $(readlink "/proc/$$/fd/$fd") # NOTE: No need to close both end of a pipe. exec {fd}>&- @@ -39,6 +28,25 @@ proc fifo-fd-destroy(; fd) { } } +# +# FIFO File Descriptors +# + +proc fifo-fd-new(; out_fd) { + # WARN: this section should be critical but for now it's not + # A solution may be retry on fail. + #==================== + var fifo = $(mktemp -u) + mkfifo $fifo + #==================== + exec {fd}<>$fifo + call out_fd->setValue(fd) +} + +proc fifo-fd-destroy (; fd) { + fd-destroy (fd) +} + # # Semaphores # @@ -133,3 +141,53 @@ proc channel-destroy(; chan) { mutex-destroy (chan.read_lock) mutex-destroy (chan.write_lock) } + +# +# RWLock +# + +proc rw-lock-new(; out_lock) { + var lockfile = $(mktemp) + var lock = { + fd: null, + lockfile, + } + call out_lock->setValue(lock) +} + +proc rw-lock-shared(; lock) { + if (lock.fd === null) { + exec {lock_fd}<>$[lock.lockfile] + setvar lock.fd = lock_fd + } + flock -s $[lock.fd] +} + +proc rw-lock-exclusive(; lock) { + if (lock.fd === null) { + exec {lock_fd}<>$[lock.lockfile] + setvar lock.fd = lock_fd + } + flock -x $[lock.fd] +} + +proc rw-lock-promote(; lock) { + rw-unlock (lock) + rw-lock-exclusive (lock) +} + +proc rw-unlock(; lock) { + if (lock.fd === null) { + echo "not locked" + exit 1 + } + flock -u $[lock.fd] +} + +proc rw-lock-destroy(; lock) { + if (lock.fd !== null) { + fd-destroy (lock.fd) + } else { + rm $[lock.lockfile] + } +} From 3543afe45b91d927dbd3358fb9501364f290a6e1 Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 10:48:51 +0800 Subject: [PATCH 18/52] add vim folds --- stdlib/draft-synch.ysh | 32 ++++++++++++++------------------ stdlib/test.ysh | 16 ++++++++++++++++ 2 files changed, 30 insertions(+), 18 deletions(-) create mode 100755 stdlib/test.ysh diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 1051415041..689c345af1 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -1,10 +1,9 @@ #!/usr/bin/env ysh +# vim:foldmethod=marker module stdlib/synch || return 0 -# -# General utilities for fds -# +# General utilities for fds {{{ # Tests whether the fd is opened func fdTest(fd) { @@ -27,10 +26,8 @@ proc fd-destroy(; fd) { exit 1 } } - -# -# FIFO File Descriptors -# +# }}} +# FIFO File Descriptors {{{ proc fifo-fd-new(; out_fd) { # WARN: this section should be critical but for now it's not @@ -47,9 +44,8 @@ proc fifo-fd-destroy (; fd) { fd-destroy (fd) } -# -# Semaphores -# +# }}} +# Semaphores {{{ proc sema-new(; value, out_sema) { fifo-fd-new (&sema) @@ -73,9 +69,8 @@ proc sema-destroy(; sema) { fifo-fd-destroy (sema) } -# -# Mutex -# +# }}} +# Mutex {{{ proc mutex-new(; out_mutex) { sema-new (1, out_mutex) @@ -93,8 +88,8 @@ proc mutex-destroy(; mutex) { sema-destroy (mutex) } -# -# Channels +# }}} +# Channels {{{ # - Pipeline but you can send multiple objects across pipelines # - Has to manually ensure the number of sent/received blocks are same across each side. # - Blocks, no buffering @@ -142,9 +137,8 @@ proc channel-destroy(; chan) { mutex-destroy (chan.write_lock) } -# -# RWLock -# +# }}} +# RWLock {{{ proc rw-lock-new(; out_lock) { var lockfile = $(mktemp) @@ -191,3 +185,5 @@ proc rw-lock-destroy(; lock) { rm $[lock.lockfile] } } + +# }}} diff --git a/stdlib/test.ysh b/stdlib/test.ysh new file mode 100755 index 0000000000..fca019c1a7 --- /dev/null +++ b/stdlib/test.ysh @@ -0,0 +1,16 @@ +#!/usr/bin/env ysh + +source ./draft-synch.ysh + +rw-lock-new (&lk) + +fork { + rw-lock-exclusive (lk) + sleep 5 + rw-unlock (lk) +} +sleep 0.1 +rw-lock-exclusive (lk) +echo 1 +rw-unlock (lk) +wait From 2dfb7ef27b32f293b4261352c7dc78595f5c6715 Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 11:18:59 +0800 Subject: [PATCH 19/52] implement exhaustive channel --- stdlib/draft-synch.ysh | 87 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 86 insertions(+), 1 deletion(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 689c345af1..b9c535646c 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -132,9 +132,13 @@ proc channel-out(; chan) { } proc channel-destroy(; chan) { + # Ensures no one else is working + mutex-acquire (chan.read_lock) + mutex-acquire (chan.write_lock) + # Clean up fifo-fd-destroy (chan.pipe) - mutex-destroy (chan.read_lock) mutex-destroy (chan.write_lock) + mutex-destroy (chan.read_lock) } # }}} @@ -165,6 +169,17 @@ proc rw-lock-exclusive(; lock) { flock -x $[lock.fd] } +proc rw-try-lock-exclusive(; lock) { + if (lock.fd === null) { + exec {lock_fd}<>$[lock.lockfile] + setvar lock.fd = lock_fd + } + try { + flock -x -n $[lock.fd] + } + return $_status +} + proc rw-lock-promote(; lock) { rw-unlock (lock) rw-lock-exclusive (lock) @@ -187,3 +202,73 @@ proc rw-lock-destroy(; lock) { } # }}} +# Exhaustable Channels {{{ +# Channels but exhaustable + +proc exh-channel-new(; out_chan) { + mutex-new (&write_lock) + mutex-new (&read_lock) + fifo-fd-new (&pipe) + rw-lock-new (&exhaust_lock) + call out_chan->setValue({ + write_lock, + read_lock, + pipe, + exhaust_lock, + }) +} + +proc exh-channel-in(; chan) { + var data = $(cat) # consume everything from input + var data_len = len(data) + mutex-acquire (chan.write_lock) + rw-lock-shared (chan.exhaust_lock) + write -n ${data_len}${ZERO_CHAR}${data} >&$[chan.pipe] + rw-unlock (chan.exhaust_lock) + mutex-release (chan.write_lock) +} + +proc exh-channel-out(; chan) { + var data_len = "" + mutex-acquire (chan.read_lock) + while (true) { + read -n 1 next_char <&$[chan.pipe] + if (next_char !== ZERO_CHAR) { + setvar data_len = data_len ++ next_char + } else { + break + } + } + read -n $data_len chunk <&$[chan.pipe] + write -n $chunk + mutex-release (chan.read_lock) +} + +proc exh-channel-exhaust(; chan, out_ret) { + var ret = [] + while(true) { + try { + rw-try-lock-exclusive (chan.exhaust_lock) + } + if (_status === 0) { + break + } + var buf = $(exh-channel-out (chan)) + call ret->append(buf) + } + rw-unlock (chan.exhaust_lock) + call out_ret->setValue(ret) +} + +proc exh-channel-destroy(; chan) { + # Ensures no one else is working + mutex-acquire (chan.read_lock) + mutex-acquire (chan.write_lock) + rw-lock-exclusive (chan.exhaust_lock) + # Clean up + fifo-fd-destroy (chan.pipe) + rw-lock-destroy (chan.exhaust_lock) + mutex-destroy (chan.write_lock) + mutex-destroy (chan.read_lock) +} +# }}} From 6035fbe280d5649b9854e3ee021098f6048482a1 Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 11:21:27 +0800 Subject: [PATCH 20/52] remove redundant file --- stdlib/test.ysh | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100755 stdlib/test.ysh diff --git a/stdlib/test.ysh b/stdlib/test.ysh deleted file mode 100755 index fca019c1a7..0000000000 --- a/stdlib/test.ysh +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env ysh - -source ./draft-synch.ysh - -rw-lock-new (&lk) - -fork { - rw-lock-exclusive (lk) - sleep 5 - rw-unlock (lk) -} -sleep 0.1 -rw-lock-exclusive (lk) -echo 1 -rw-unlock (lk) -wait From aa6bc253081ae935de28aa96e25e9a5df6c98dfe Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 11:26:50 +0800 Subject: [PATCH 21/52] add rwlock test --- spec/ysh-stdlib-synch.test.sh | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh index 2025f742f3..ca118b28d5 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-synch.test.sh @@ -90,3 +90,36 @@ echo $sum ## STDOUT: 24 ## END + +#### RWLock multiple shared lock and free, one exclusive lock +source --builtin draft-synch.ysh + +rw-lock-new (&lk) + +fork { + rw-lock-shared (lk) + echo 1 + sleep 0.3 + rw-unlock (lk) +} +for _ in (0..3) { + fork { + sleep 0.1 + rw-lock-shared (lk) + echo 2 + sleep 0.2 + rw-unlock (lk) + } +} +sleep 0.1 +rw-lock-exclusive (lk) +echo 3 +rw-unlock (lk) +rw-lock-destroy (lk) +## STDOUT: +1 +2 +2 +2 +3 +## END From 3566aa617531f0c7a3bbcbf718d478d4c71d9ffd Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 16:42:38 +0800 Subject: [PATCH 22/52] add test case for exh-channel --- spec/ysh-stdlib-synch.test.sh | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh index ca118b28d5..a1542e46e7 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-synch.test.sh @@ -87,6 +87,7 @@ for i in (0..16) { } echo $sum +channel-destroy (ch) ## STDOUT: 24 ## END @@ -123,3 +124,35 @@ rw-lock-destroy (lk) 2 3 ## END + +#### Produce many value and exhaust the exhaust the channel once for all, and reuse it +source --builtin draft-synch.ysh + +exh-channel-new (&ch) + +for i in (0..4) { + fork { + for j in (0..4) { + echo $j | exh-channel-in (ch) + } + } +} + +sleep 0.5 +exh-channel-exhaust (ch, &out) +var sum = 0 +for i in (out) { + setvar sum += cur +} +echo $sum +# Reuses the channel +fork { + echo "yes!" | exh-channel-in (ch) +} +exh-channel-out (ch) +echo +exh-channel-destroy (ch) +## STDOUT: +24 +yes! +## END From 99d5337081fc75b1dcd6b5838a4a41711e262a6d Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 22:59:21 +0800 Subject: [PATCH 23/52] remove redundant promotion --- stdlib/draft-synch.ysh | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index b9c535646c..9fbad093e2 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -153,6 +153,7 @@ proc rw-lock-new(; out_lock) { call out_lock->setValue(lock) } +# NOTE: to change the type of a lock to T, e.g. from shared to exclusive, just call rw-lock-{T} again. proc rw-lock-shared(; lock) { if (lock.fd === null) { exec {lock_fd}<>$[lock.lockfile] @@ -180,11 +181,6 @@ proc rw-try-lock-exclusive(; lock) { return $_status } -proc rw-lock-promote(; lock) { - rw-unlock (lock) - rw-lock-exclusive (lock) -} - proc rw-unlock(; lock) { if (lock.fd === null) { echo "not locked" From 841c0713de0e3855a825e3ed02d365d05b5441da Mon Sep 17 00:00:00 2001 From: glyh Date: Sun, 7 Apr 2024 23:39:13 +0800 Subject: [PATCH 24/52] implement inc & dec in math --- stdlib/math.ysh | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/stdlib/math.ysh b/stdlib/math.ysh index ff8b5423e3..eec2c6aad0 100644 --- a/stdlib/math.ysh +++ b/stdlib/math.ysh @@ -77,3 +77,11 @@ func abs(x) { return (x) } } + +func inc(x) { + return (x + 1) +} + +func dec(x) { + return (x - 1) +} From 6f79596ae9db8ddad0cb54da2fb5d179722ed30c Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 00:13:02 +0800 Subject: [PATCH 25/52] try to fix exhaustive channel --- stdlib/draft-synch.ysh | 76 +++++++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 9fbad093e2..a6add8a260 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -153,10 +153,20 @@ proc rw-lock-new(; out_lock) { call out_lock->setValue(lock) } +proc rw-lock-read-out(; lock) { + # TODO: ensure we're actually holding the shared/exclusive lock + cat <&lock.fd +} + +proc rw-lock-write-in(; lock) { + # TODO: ensure we're actually holding the exclusive lock + cat >&lock.fd +} + # NOTE: to change the type of a lock to T, e.g. from shared to exclusive, just call rw-lock-{T} again. proc rw-lock-shared(; lock) { if (lock.fd === null) { - exec {lock_fd}<>$[lock.lockfile] + exec {lock_fd}<$[lock.lockfile] setvar lock.fd = lock_fd } flock -s $[lock.fd] @@ -170,17 +180,6 @@ proc rw-lock-exclusive(; lock) { flock -x $[lock.fd] } -proc rw-try-lock-exclusive(; lock) { - if (lock.fd === null) { - exec {lock_fd}<>$[lock.lockfile] - setvar lock.fd = lock_fd - } - try { - flock -x -n $[lock.fd] - } - return $_status -} - proc rw-unlock(; lock) { if (lock.fd === null) { echo "not locked" @@ -189,6 +188,14 @@ proc rw-unlock(; lock) { flock -u $[lock.fd] } +# Performs an lock-guarded write on a lock's file +proc rw-lock-swap-pipe(; lock) { + rw-lock-exclusive (lock) + rw-lock-read-out + rw-lock-write-in + rw-unlock (lock) +} + proc rw-lock-destroy(; lock) { if (lock.fd !== null) { fd-destroy (lock.fd) @@ -205,23 +212,40 @@ proc exh-channel-new(; out_chan) { mutex-new (&write_lock) mutex-new (&read_lock) fifo-fd-new (&pipe) - rw-lock-new (&exhaust_lock) + # a counter on how many information we have on the pipe + rw-lock-new (&message_count) + # a lock indicating if there's upcoming writes + rw-lock-new (&will_write) + write -n 0 | rw-lock-write-in (&exhaust_lock) call out_chan->setValue({ write_lock, read_lock, pipe, - exhaust_lock, + message_count, + will_write, }) } proc exh-channel-in(; chan) { var data = $(cat) # consume everything from input var data_len = len(data) + # NOTE: the below can be abstracted out to a 2-direction pipe + # This is a pretty common pattern in bash + ################################################# + fifo-fd-new (&fd) + rw-lock-swap-pipe (chan.message_count) <&$fd | { + var cnt + json read (&cnt) + setvar cnt += 1 + echo $cnt + } >&$fd + fifo-fd-destroy (fd) + ################################################# + rw-lock-shared (chan.will_write) mutex-acquire (chan.write_lock) - rw-lock-shared (chan.exhaust_lock) - write -n ${data_len}${ZERO_CHAR}${data} >&$[chan.pipe] - rw-unlock (chan.exhaust_lock) + write -n $data_len$ZERO_CHAR$data >&$[chan.pipe] mutex-release (chan.write_lock) + rw-unlock (chan.will_write) } proc exh-channel-out(; chan) { @@ -242,17 +266,13 @@ proc exh-channel-out(; chan) { proc exh-channel-exhaust(; chan, out_ret) { var ret = [] - while(true) { - try { - rw-try-lock-exclusive (chan.exhaust_lock) - } - if (_status === 0) { - break - } - var buf = $(exh-channel-out (chan)) - call ret->append(buf) + rw-lock-exclusive (chan.will_write) + rw-lock-read-out | json read (&num_data) + + for _ in (0..num_data) { + call ret->append($(exh-channel-out)) } - rw-unlock (chan.exhaust_lock) + rw-unlock (chan.will_write) call out_ret->setValue(ret) } @@ -261,8 +281,10 @@ proc exh-channel-destroy(; chan) { mutex-acquire (chan.read_lock) mutex-acquire (chan.write_lock) rw-lock-exclusive (chan.exhaust_lock) + rw-lock-exclusive (chan.message_count) # Clean up fifo-fd-destroy (chan.pipe) + rw-lock-destroy (chan.message_count) rw-lock-destroy (chan.exhaust_lock) mutex-destroy (chan.write_lock) mutex-destroy (chan.read_lock) From 655168b8d742175c997587e9d5e9246abcf53e5d Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 00:15:22 +0800 Subject: [PATCH 26/52] note on proc obj --- stdlib/draft-synch.ysh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index a6add8a260..6ab1479ab3 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -230,7 +230,8 @@ proc exh-channel-in(; chan) { var data = $(cat) # consume everything from input var data_len = len(data) # NOTE: the below can be abstracted out to a 2-direction pipe - # This is a pretty common pattern in bash + # This is a pretty common pattern in bash. + # However there's no way to refer to a proc for now ################################################# fifo-fd-new (&fd) rw-lock-swap-pipe (chan.message_count) <&$fd | { From 5f4fa08d376f8ab05241dfbd8ddea8c508b5a027 Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 00:32:41 +0800 Subject: [PATCH 27/52] fix: read/write RW-Lock --- stdlib/draft-synch.ysh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 6ab1479ab3..68007e2587 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -155,12 +155,12 @@ proc rw-lock-new(; out_lock) { proc rw-lock-read-out(; lock) { # TODO: ensure we're actually holding the shared/exclusive lock - cat <&lock.fd + cat <&$[lock.fd] } proc rw-lock-write-in(; lock) { # TODO: ensure we're actually holding the exclusive lock - cat >&lock.fd + cat >&$[lock.fd] } # NOTE: to change the type of a lock to T, e.g. from shared to exclusive, just call rw-lock-{T} again. From 441babe14b081f1bc8a19120a9e7abc95a0de23a Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 00:53:28 +0800 Subject: [PATCH 28/52] fix some syntax error --- stdlib/draft-synch.ysh | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 68007e2587..0e2789e217 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -108,25 +108,25 @@ proc channel-new(; out_chan) { } proc channel-in(; chan) { - var data = $(cat) # consume everything from input - var data_len = len(data) + var msg = $(cat) # consume everything from input + var msg_len = len(msg) mutex-acquire (chan.write_lock) - write -n ${data_len}${ZERO_CHAR}${data} >&$[chan.pipe] + write -n ${msg_len}${ZERO_CHAR}${msg} >&$[chan.pipe] mutex-release (chan.write_lock) } proc channel-out(; chan) { - var data_len = "" + var msg_len = "" mutex-acquire (chan.read_lock) while (true) { read -n 1 next_char <&$[chan.pipe] if (next_char !== ZERO_CHAR) { - setvar data_len = data_len ++ next_char + setvar msg_len = msg_len ++ next_char } else { break } } - read -n $data_len chunk <&$[chan.pipe] + read -n $msg_len chunk <&$[chan.pipe] write -n $chunk mutex-release (chan.read_lock) } @@ -191,8 +191,8 @@ proc rw-unlock(; lock) { # Performs an lock-guarded write on a lock's file proc rw-lock-swap-pipe(; lock) { rw-lock-exclusive (lock) - rw-lock-read-out - rw-lock-write-in + rw-lock-read-out (lock) + rw-lock-write-in (lock) rw-unlock (lock) } @@ -216,7 +216,9 @@ proc exh-channel-new(; out_chan) { rw-lock-new (&message_count) # a lock indicating if there's upcoming writes rw-lock-new (&will_write) - write -n 0 | rw-lock-write-in (&exhaust_lock) + rw-lock-exclusive (message_count) + write -n 0 | rw-lock-write-in (message_count) + rw-unlock (message_count) call out_chan->setValue({ write_lock, read_lock, @@ -227,8 +229,8 @@ proc exh-channel-new(; out_chan) { } proc exh-channel-in(; chan) { - var data = $(cat) # consume everything from input - var data_len = len(data) + var msg = $(cat) # consume everything from input + var msg_len = len(msg) # NOTE: the below can be abstracted out to a 2-direction pipe # This is a pretty common pattern in bash. # However there's no way to refer to a proc for now @@ -244,23 +246,23 @@ proc exh-channel-in(; chan) { ################################################# rw-lock-shared (chan.will_write) mutex-acquire (chan.write_lock) - write -n $data_len$ZERO_CHAR$data >&$[chan.pipe] + write -n $msg_len$ZERO_CHAR$msg >&$[chan.pipe] mutex-release (chan.write_lock) rw-unlock (chan.will_write) } proc exh-channel-out(; chan) { - var data_len = "" + var msg_len = "" mutex-acquire (chan.read_lock) while (true) { read -n 1 next_char <&$[chan.pipe] if (next_char !== ZERO_CHAR) { - setvar data_len = data_len ++ next_char + setvar msg_len = msg_len ++ next_char } else { break } } - read -n $data_len chunk <&$[chan.pipe] + read -n $msg_len chunk <&$[chan.pipe] write -n $chunk mutex-release (chan.read_lock) } @@ -268,9 +270,9 @@ proc exh-channel-out(; chan) { proc exh-channel-exhaust(; chan, out_ret) { var ret = [] rw-lock-exclusive (chan.will_write) - rw-lock-read-out | json read (&num_data) + rw-lock-read-out (chan.message_count) | json read (&num_msg) - for _ in (0..num_data) { + for _ in (0..num_msg) { call ret->append($(exh-channel-out)) } rw-unlock (chan.will_write) From 68e903c4c591a806857751604f96e0b105cf1a1a Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 01:02:36 +0800 Subject: [PATCH 29/52] should use lockfile instead of lock fd to read the file --- stdlib/draft-synch.ysh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 0e2789e217..795766cc5f 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -155,12 +155,12 @@ proc rw-lock-new(; out_lock) { proc rw-lock-read-out(; lock) { # TODO: ensure we're actually holding the shared/exclusive lock - cat <&$[lock.fd] + cat $[lock.lockfile] } proc rw-lock-write-in(; lock) { # TODO: ensure we're actually holding the exclusive lock - cat >&$[lock.fd] + cat > $[lock.lockfile] } # NOTE: to change the type of a lock to T, e.g. from shared to exclusive, just call rw-lock-{T} again. From 7142fb2d0788ef1e0b129aa7c263a3c909c2a06e Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 01:05:25 +0800 Subject: [PATCH 30/52] add test for rwlock read&write --- spec/ysh-stdlib-synch.test.sh | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh index a1542e46e7..c2c58aa481 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-synch.test.sh @@ -125,6 +125,34 @@ rw-lock-destroy (lk) 3 ## END +#### Reading and writing rw-lock +source --builtin draft-synch.ysh + +rw-lock-new (&l) +fork { + rw-lock-exclusive (l) + write -n 'w' | rw-lock-write-in (l) + rw-unlock (l) +} +sleep 0.1 + +for _ in (0..3) { + fork { + rw-lock-shared (l) + rw-lock-read-out (l) + sleep 0.2 + rw-lock-read-out (l) + rw-unlock (l) + } +} +sleep 0.1 +write -n y +wait +write +## STDOUT: +wwwywww +## END + #### Produce many value and exhaust the exhaust the channel once for all, and reuse it source --builtin draft-synch.ysh From 259c1b304674cef6761d2af416c8ccee0a82a630 Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 01:10:52 +0800 Subject: [PATCH 31/52] typo --- spec/ysh-stdlib-synch.test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh index c2c58aa481..b519d00f49 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-synch.test.sh @@ -153,7 +153,7 @@ write wwwywww ## END -#### Produce many value and exhaust the exhaust the channel once for all, and reuse it +#### Produce many value and exhaust the channel, and then reuse it source --builtin draft-synch.ysh exh-channel-new (&ch) From bd5a8f67d38fe46adcceb109e4a4073d57660016 Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 03:46:40 +0800 Subject: [PATCH 32/52] fix exhaustable channel and underlying rwlock --- spec/ysh-stdlib-synch.test.sh | 2 +- stdlib/draft-synch.ysh | 88 +++++++++++++++++++---------------- 2 files changed, 49 insertions(+), 41 deletions(-) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh index b519d00f49..a746fa2ee6 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-synch.test.sh @@ -170,7 +170,7 @@ sleep 0.5 exh-channel-exhaust (ch, &out) var sum = 0 for i in (out) { - setvar sum += cur + setvar sum += i } echo $sum # Reuses the channel diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 795766cc5f..0f58f9f379 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -155,45 +155,35 @@ proc rw-lock-new(; out_lock) { proc rw-lock-read-out(; lock) { # TODO: ensure we're actually holding the shared/exclusive lock + # if (lock.fd != null) ... cat $[lock.lockfile] } proc rw-lock-write-in(; lock) { # TODO: ensure we're actually holding the exclusive lock + # if (lock.fd != null) ... cat > $[lock.lockfile] } # NOTE: to change the type of a lock to T, e.g. from shared to exclusive, just call rw-lock-{T} again. proc rw-lock-shared(; lock) { - if (lock.fd === null) { - exec {lock_fd}<$[lock.lockfile] - setvar lock.fd = lock_fd - } + exec {lock_fd}<$[lock.lockfile] + setvar lock.fd = lock_fd flock -s $[lock.fd] } proc rw-lock-exclusive(; lock) { - if (lock.fd === null) { - exec {lock_fd}<>$[lock.lockfile] - setvar lock.fd = lock_fd - } + exec {lock_fd}<>$[lock.lockfile] + setvar lock.fd = lock_fd flock -x $[lock.fd] } +# TODO: detect if locking proc rw-unlock(; lock) { - if (lock.fd === null) { - echo "not locked" - exit 1 - } - flock -u $[lock.fd] -} - -# Performs an lock-guarded write on a lock's file -proc rw-lock-swap-pipe(; lock) { - rw-lock-exclusive (lock) - rw-lock-read-out (lock) - rw-lock-write-in (lock) - rw-unlock (lock) + var lock_fd = lock.fd + setvar lock.fd = null + flock -u $lock_fd + exec {lock_fd}<&- } proc rw-lock-destroy(; lock) { @@ -204,6 +194,18 @@ proc rw-lock-destroy(; lock) { } } +# TODO: rename this to swap-fn +proc rw-lock-swap-f(; lock, fn) { + rw-lock-exclusive (lock) + var swapped = $(rw-lock-read-out (lock)) => fn() + write -n $swapped | rw-lock-write-in (lock) + rw-unlock (lock) +} + +# TODO: Performs an lock-guarded write on a lock's file +# The issue is we need to know when to close the pipe +# proc rw-lock-swap-pipe(; lock) + # }}} # Exhaustable Channels {{{ # Channels but exhaustable @@ -228,22 +230,17 @@ proc exh-channel-new(; out_chan) { }) } +func __synch_exh_channel_inc_untyped (buf) { + var typed = buf => int() + var swapped = typed + 1 + var untyped = "$[swapped]" + return (untyped) +} + proc exh-channel-in(; chan) { var msg = $(cat) # consume everything from input var msg_len = len(msg) - # NOTE: the below can be abstracted out to a 2-direction pipe - # This is a pretty common pattern in bash. - # However there's no way to refer to a proc for now - ################################################# - fifo-fd-new (&fd) - rw-lock-swap-pipe (chan.message_count) <&$fd | { - var cnt - json read (&cnt) - setvar cnt += 1 - echo $cnt - } >&$fd - fifo-fd-destroy (fd) - ################################################# + rw-lock-swap-f (chan.message_count, __synch_exh_channel_inc_untyped) rw-lock-shared (chan.will_write) mutex-acquire (chan.write_lock) write -n $msg_len$ZERO_CHAR$msg >&$[chan.pipe] @@ -251,9 +248,8 @@ proc exh-channel-in(; chan) { rw-unlock (chan.will_write) } -proc exh-channel-out(; chan) { +proc __exh_channel_out_no_lock(; chan) { var msg_len = "" - mutex-acquire (chan.read_lock) while (true) { read -n 1 next_char <&$[chan.pipe] if (next_char !== ZERO_CHAR) { @@ -264,17 +260,29 @@ proc exh-channel-out(; chan) { } read -n $msg_len chunk <&$[chan.pipe] write -n $chunk +} + +proc exh-channel-out(; chan) { + mutex-acquire (chan.read_lock) + __exh_channel_out_no_lock (chan) mutex-release (chan.read_lock) } proc exh-channel-exhaust(; chan, out_ret) { - var ret = [] + # No body should send anything to the channel from now on rw-lock-exclusive (chan.will_write) + # Count how many message we have + rw-lock-exclusive (chan.message_count) rw-lock-read-out (chan.message_count) | json read (&num_msg) + write -n 0 | rw-lock-write-in (chan.message_count) + rw-unlock (chan.message_count) + var ret = [] + mutex-acquire (chan.read_lock) for _ in (0..num_msg) { - call ret->append($(exh-channel-out)) + call ret->append($(__exh_channel_out_no_lock (chan))) } + mutex-release (chan.read_lock) rw-unlock (chan.will_write) call out_ret->setValue(ret) } @@ -283,12 +291,12 @@ proc exh-channel-destroy(; chan) { # Ensures no one else is working mutex-acquire (chan.read_lock) mutex-acquire (chan.write_lock) - rw-lock-exclusive (chan.exhaust_lock) + rw-lock-exclusive (chan.will_write) rw-lock-exclusive (chan.message_count) # Clean up fifo-fd-destroy (chan.pipe) rw-lock-destroy (chan.message_count) - rw-lock-destroy (chan.exhaust_lock) + rw-lock-destroy (chan.will_write) mutex-destroy (chan.write_lock) mutex-destroy (chan.read_lock) } From 715e5956189b218c779c85871f1d763ffff4c341 Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 03:52:59 +0800 Subject: [PATCH 33/52] rename swap-f to swap-fn for rwlock --- stdlib/draft-synch.ysh | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 0f58f9f379..6dab02b665 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -194,8 +194,7 @@ proc rw-lock-destroy(; lock) { } } -# TODO: rename this to swap-fn -proc rw-lock-swap-f(; lock, fn) { +proc rw-lock-swap-fn(; lock, fn) { rw-lock-exclusive (lock) var swapped = $(rw-lock-read-out (lock)) => fn() write -n $swapped | rw-lock-write-in (lock) @@ -240,7 +239,7 @@ func __synch_exh_channel_inc_untyped (buf) { proc exh-channel-in(; chan) { var msg = $(cat) # consume everything from input var msg_len = len(msg) - rw-lock-swap-f (chan.message_count, __synch_exh_channel_inc_untyped) + rw-lock-swap-fn (chan.message_count, __synch_exh_channel_inc_untyped) rw-lock-shared (chan.will_write) mutex-acquire (chan.write_lock) write -n $msg_len$ZERO_CHAR$msg >&$[chan.pipe] From 665fe6675b98dc9d8c0673f22730446ef4138997 Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 04:07:33 +0800 Subject: [PATCH 34/52] ensure lock is held when read/write/unlock --- stdlib/draft-synch.ysh | 61 +++++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 6dab02b665..6c761b211b 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -5,19 +5,18 @@ module stdlib/synch || return 0 # General utilities for fds {{{ -# Tests whether the fd is opened -func fdTest(fd) { - var fdlink = "/proc/$$/fd/$fd" - if test -L $fdlink { - return (true) - } else { - return (false) - } +# Get fdInfo, returns null if fd doesn't exist +proc get-fd-info(; fd, out_fd_info) { + call out_fd_info->setValue($(cat /proc/$BASHPID/fdinfo/$fd)) } proc fd-destroy(; fd) { - if (fdTest(fd)) { - var fifoFile = $(readlink "/proc/$$/fd/$fd") + var fd_info = "" + try { + get-fd-info (fd, &fd_info) + } + if (_status === 0) { + var fifoFile = $(readlink "/proc/$BASHPID/fd/$fd") # NOTE: No need to close both end of a pipe. exec {fd}>&- rm $fifoFile @@ -154,15 +153,29 @@ proc rw-lock-new(; out_lock) { } proc rw-lock-read-out(; lock) { - # TODO: ensure we're actually holding the shared/exclusive lock - # if (lock.fd != null) ... - cat $[lock.lockfile] + try { + get-fd-info (lock.fd, &lock_fd_info) + } + if (_status === 0) { + # TODO: ensure lock is held with correct permission + cat $[lock.lockfile] + } else { + echo "No lock held at $[lock.fd]" 1>&2 + exit 1 + } } proc rw-lock-write-in(; lock) { - # TODO: ensure we're actually holding the exclusive lock - # if (lock.fd != null) ... - cat > $[lock.lockfile] + try { + get-fd-info (lock.fd, &lock_fd_info) + } + if (_status === 0) { + # TODO: ensure lock is held with correct permission + cat > $[lock.lockfile] + } else { + echo "No lock held at $[lock.fd]" 1>&2 + exit 1 + } } # NOTE: to change the type of a lock to T, e.g. from shared to exclusive, just call rw-lock-{T} again. @@ -178,12 +191,18 @@ proc rw-lock-exclusive(; lock) { flock -x $[lock.fd] } -# TODO: detect if locking proc rw-unlock(; lock) { - var lock_fd = lock.fd - setvar lock.fd = null - flock -u $lock_fd - exec {lock_fd}<&- + try { + get-fd-info (lock.fd, &lock_fd_info) + } + if (_status === 0) { + var lock_fd = lock.fd + flock -u $lock_fd + exec {lock_fd}<&- + } else { + echo "No lock held at $[lock.fd]" 1>&2 + exit 1 + } } proc rw-lock-destroy(; lock) { From 08263afc92094f65f32a434adfaee453a3675f24 Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 04:48:44 +0800 Subject: [PATCH 35/52] correctly handle rw-lock permission --- stdlib/draft-synch.ysh | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 6c761b211b..bec4b6fb58 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -10,6 +10,30 @@ proc get-fd-info(; fd, out_fd_info) { call out_fd_info->setValue($(cat /proc/$BASHPID/fdinfo/$fd)) } +proc get-fd-flag(; fd, out_fd_flag) { + get-fd-info (fd, &fd_info) + if (fd_info ~ / 'flags:' \t /) { + call out_fd_flag->setValue(_match(1)) + } else { + echo "Can't find flags in fdinfo:" + echo $fd_info + exit 1 + } +} + +# O_RDONLY (00), O_WRONLY (01) & O_RDWR (02) +func isFdRead(fd) { + # HACK: this take one octal number direct from the number string, + # since currently there's no way to convert number bits in std. + get-fd-flag (fd, &fd_flag) + return (fd_flag[-1] === "0" or fd_flag[-1] === "2" ) +} + +func isFdWrite (fd) { + get-fd-flag (fd, &fd_flag) + return (fd_flag[-1] === "1" or fd_flag[-1] === "2") +} + proc fd-destroy(; fd) { var fd_info = "" try { @@ -153,27 +177,21 @@ proc rw-lock-new(; out_lock) { } proc rw-lock-read-out(; lock) { - try { - get-fd-info (lock.fd, &lock_fd_info) - } - if (_status === 0) { + if (isFdRead(lock.fd)) { # TODO: ensure lock is held with correct permission cat $[lock.lockfile] } else { - echo "No lock held at $[lock.fd]" 1>&2 + echo "No rwlock held at $[lock.fd]" 1>&2 exit 1 } } proc rw-lock-write-in(; lock) { - try { - get-fd-info (lock.fd, &lock_fd_info) - } - if (_status === 0) { + if (isFdWrite(lock.fd)) { # TODO: ensure lock is held with correct permission cat > $[lock.lockfile] } else { - echo "No lock held at $[lock.fd]" 1>&2 + echo "No exclusive lock held at rwlock $[lock.fd]" 1>&2 exit 1 } } @@ -200,7 +218,7 @@ proc rw-unlock(; lock) { flock -u $lock_fd exec {lock_fd}<&- } else { - echo "No lock held at $[lock.fd]" 1>&2 + echo "No rwlock held at $[lock.fd]" 1>&2 exit 1 } } From 29af63edffb73c639563d1d71920e4032ba5fec5 Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 04:58:02 +0800 Subject: [PATCH 36/52] fix rw-lock destroy --- stdlib/draft-synch.ysh | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index bec4b6fb58..30b6662bd8 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -224,11 +224,8 @@ proc rw-unlock(; lock) { } proc rw-lock-destroy(; lock) { - if (lock.fd !== null) { - fd-destroy (lock.fd) - } else { - rm $[lock.lockfile] - } + rw-lock-exclusive (lock) + fd-destroy (lock.fd) } proc rw-lock-swap-fn(; lock, fn) { From 650599343b7cf3899fa78fd1db1aa92cfe9bd290 Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 05:00:37 +0800 Subject: [PATCH 37/52] should destroy lock after use --- spec/ysh-stdlib-synch.test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh index a746fa2ee6..a2cd53e775 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-synch.test.sh @@ -148,6 +148,7 @@ for _ in (0..3) { sleep 0.1 write -n y wait +rw-lock-destroy (l) write ## STDOUT: wwwywww From 379297ef1f89ad384fe94d988d7b5456abc307a9 Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 05:00:45 +0800 Subject: [PATCH 38/52] fix destroy logic --- stdlib/draft-synch.ysh | 2 -- 1 file changed, 2 deletions(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 30b6662bd8..f715cb8b24 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -324,8 +324,6 @@ proc exh-channel-destroy(; chan) { # Ensures no one else is working mutex-acquire (chan.read_lock) mutex-acquire (chan.write_lock) - rw-lock-exclusive (chan.will_write) - rw-lock-exclusive (chan.message_count) # Clean up fifo-fd-destroy (chan.pipe) rw-lock-destroy (chan.message_count) From 16364b3acc3f226759a101248c9afcf1bf274ce2 Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 07:52:41 +0800 Subject: [PATCH 39/52] add some notes --- stdlib/draft-synch.ysh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index f715cb8b24..978d32403a 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -116,6 +116,7 @@ proc mutex-destroy(; mutex) { # - Pipeline but you can send multiple objects across pipelines # - Has to manually ensure the number of sent/received blocks are same across each side. # - Blocks, no buffering +# - Backed by a modified net-string implementation const ZERO_CHAR = u'\u{0}' @@ -130,6 +131,10 @@ proc channel-new(; out_chan) { }) } +# NOTE: I would love to optimize this a bit more, for example netstring of size n +# now takes log_10(n) over head. We can certainly do this better by byte encoding +# It would be log_128(n) (using 1 bit for indicating the number ends) +# That's a ln 128 / ln 10 which is roughly twice less overhead. proc channel-in(; chan) { var msg = $(cat) # consume everything from input var msg_len = len(msg) From 85eb607c9e8c0aaf3c8809f5a5a724802c7cd9b6 Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 08:17:04 +0800 Subject: [PATCH 40/52] more notes --- stdlib/draft-synch.ysh | 1 + 1 file changed, 1 insertion(+) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 978d32403a..1314e1b9fb 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -117,6 +117,7 @@ proc mutex-destroy(; mutex) { # - Has to manually ensure the number of sent/received blocks are same across each side. # - Blocks, no buffering # - Backed by a modified net-string implementation +# - We may may modify this to replace with an implementation that allows streaming, however. const ZERO_CHAR = u'\u{0}' From 04e88779afa7f8023eff7c023cd47b11ff0ec320 Mon Sep 17 00:00:00 2001 From: glyh Date: Mon, 8 Apr 2024 08:23:59 +0800 Subject: [PATCH 41/52] prefers j8 instead of json for lossless information --- stdlib/draft-synch.ysh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index 1314e1b9fb..d729d8e31f 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -312,7 +312,7 @@ proc exh-channel-exhaust(; chan, out_ret) { rw-lock-exclusive (chan.will_write) # Count how many message we have rw-lock-exclusive (chan.message_count) - rw-lock-read-out (chan.message_count) | json read (&num_msg) + rw-lock-read-out (chan.message_count) | json8 read (&num_msg) write -n 0 | rw-lock-write-in (chan.message_count) rw-unlock (chan.message_count) From 99b0ec7ef821e7e75d6254d6b92c142535fa3ddb Mon Sep 17 00:00:00 2001 From: glyh Date: Tue, 9 Apr 2024 10:33:38 +0800 Subject: [PATCH 42/52] more note --- stdlib/draft-synch.ysh | 1 + 1 file changed, 1 insertion(+) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index d729d8e31f..b52424ed6b 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -269,6 +269,7 @@ proc exh-channel-new(; out_chan) { }) } +# TODO: when lambda landed, this can be refactor to a parse chained to an inc. func __synch_exh_channel_inc_untyped (buf) { var typed = buf => int() var swapped = typed + 1 From d2882aed439af44c5a098c671ef1004d50981306 Mon Sep 17 00:00:00 2001 From: glyh Date: Tue, 9 Apr 2024 13:31:47 +0800 Subject: [PATCH 43/52] refactor out stdlib/descriptor --- stdlib/draft-descriptor.ysh | 89 +++++++++++++++++++++++++++++++++++++ stdlib/draft-synch.ysh | 70 ++--------------------------- 2 files changed, 93 insertions(+), 66 deletions(-) create mode 100644 stdlib/draft-descriptor.ysh diff --git a/stdlib/draft-descriptor.ysh b/stdlib/draft-descriptor.ysh new file mode 100644 index 0000000000..85391f62d3 --- /dev/null +++ b/stdlib/draft-descriptor.ysh @@ -0,0 +1,89 @@ +#!/usr/bin/env ysh +# vim:foldmethod=marker + +module stdlib/descriptor || return 0 + +# General utilities for file descriptors {{{ + +# NOTE: we need a proper exception system. +func getFdFile(fd) { + var fd_file + try { + setvar fd_file = $(readlink "/proc/$BASHPID/fd/$fd") + } + if (_status === 0) { + return (fd_file) + } else { + { + echo "File descriptor $fd is not opened by process $BASHPID!" + } >&2 + exit 1 + } +} + +func getFdInfo(fd) { + var fd_info = "/proc/$BASHPID/fdinfo/$fd" + if test -f $fd_info { + return ($(cat $fd_info)) + } else { + { + echo "File descriptor $fd is not opened by process $BASHPID!" + } >&2 + exit 1 + } +} + +func getFdFlag(fd) { + var fd_info = getFdInfo(fd) + if (fd_info ~ / 'flags:' \t /) { + return (_match(1)) + } else { + { + echo "Can't find flags in for file descriptor $fd of process $BASHPID, here's the content:" + echo $fd_info + } >&2 + exit 1 + } +} + +# O_RDONLY (00), O_WRONLY (01) & O_RDWR (02) +func isFdRead(fd) { + # HACK: this take one octal number direct from the number string, + # since currently there's no way to convert number bits in std. + var fd_flag = getFdFlag(fd) + return (fd_flag[-1] === "0" or fd_flag[-1] === "2" ) +} + +func isFdWrite (fd) { + var fd_flag = getFdFlag(fd) + return (fd_flag[-1] === "1" or fd_flag[-1] === "2") +} + +# More strict than the standard version +proc fd-destroy-and-rm-file(fd) { + var fd_file = getFdFile(fd) + # NOTE: bash treat >&- and <&- the same, not sure for ysh's case + # REFERENCE: https://unix.stackexchange.com/questions/131801/closing-a-file-descriptor-vs + exec {fd}>&- + rm $fd_file +} + +# }}} +# FIFO File Descriptors {{{ + +proc fifo-fd-new(; out_fd) { + # WARN: this section should be critical but for now it's not + # A solution may be retry on fail. + #==================== + var fifo = $(mktemp -u) + mkfifo $fifo + #==================== + exec {fd}<>$fifo + call out_fd->setValue(fd) +} + +proc fifo-fd-destroy (; fd) { + fd-destroy-and-rm-file $fd +} + +# }}} diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh index b52424ed6b..5cd57565bb 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-synch.ysh @@ -3,71 +3,8 @@ module stdlib/synch || return 0 -# General utilities for fds {{{ +source --builtin draft-descriptor.ysh -# Get fdInfo, returns null if fd doesn't exist -proc get-fd-info(; fd, out_fd_info) { - call out_fd_info->setValue($(cat /proc/$BASHPID/fdinfo/$fd)) -} - -proc get-fd-flag(; fd, out_fd_flag) { - get-fd-info (fd, &fd_info) - if (fd_info ~ / 'flags:' \t /) { - call out_fd_flag->setValue(_match(1)) - } else { - echo "Can't find flags in fdinfo:" - echo $fd_info - exit 1 - } -} - -# O_RDONLY (00), O_WRONLY (01) & O_RDWR (02) -func isFdRead(fd) { - # HACK: this take one octal number direct from the number string, - # since currently there's no way to convert number bits in std. - get-fd-flag (fd, &fd_flag) - return (fd_flag[-1] === "0" or fd_flag[-1] === "2" ) -} - -func isFdWrite (fd) { - get-fd-flag (fd, &fd_flag) - return (fd_flag[-1] === "1" or fd_flag[-1] === "2") -} - -proc fd-destroy(; fd) { - var fd_info = "" - try { - get-fd-info (fd, &fd_info) - } - if (_status === 0) { - var fifoFile = $(readlink "/proc/$BASHPID/fd/$fd") - # NOTE: No need to close both end of a pipe. - exec {fd}>&- - rm $fifoFile - } else { - echo "Double Destroying the fd $fd twice!" 1>&2 - exit 1 - } -} -# }}} -# FIFO File Descriptors {{{ - -proc fifo-fd-new(; out_fd) { - # WARN: this section should be critical but for now it's not - # A solution may be retry on fail. - #==================== - var fifo = $(mktemp -u) - mkfifo $fifo - #==================== - exec {fd}<>$fifo - call out_fd->setValue(fd) -} - -proc fifo-fd-destroy (; fd) { - fd-destroy (fd) -} - -# }}} # Semaphores {{{ proc sema-new(; value, out_sema) { @@ -216,8 +153,9 @@ proc rw-lock-exclusive(; lock) { } proc rw-unlock(; lock) { + var lock_fd_info try { - get-fd-info (lock.fd, &lock_fd_info) + setvar lock_fd_info = getFdInfo(lock.fd) } if (_status === 0) { var lock_fd = lock.fd @@ -231,7 +169,7 @@ proc rw-unlock(; lock) { proc rw-lock-destroy(; lock) { rw-lock-exclusive (lock) - fd-destroy (lock.fd) + fd-destroy-and-rm-file $[lock.fd] } proc rw-lock-swap-fn(; lock, fn) { From 7fb7423baa09049e9593302060d1b73a590129e5 Mon Sep 17 00:00:00 2001 From: glyh Date: Tue, 9 Apr 2024 13:35:42 +0800 Subject: [PATCH 44/52] rename: stdlib/synch -> stdlib/sync --- ...lib-synch.test.sh => ysh-stdlib-sync.test.sh} | 16 ++++++++-------- stdlib/{draft-synch.ysh => draft-sync.ysh} | 6 +++--- test/spec.sh | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) rename spec/{ysh-stdlib-synch.test.sh => ysh-stdlib-sync.test.sh} (89%) rename stdlib/{draft-synch.ysh => draft-sync.ysh} (97%) diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-sync.test.sh similarity index 89% rename from spec/ysh-stdlib-synch.test.sh rename to spec/ysh-stdlib-sync.test.sh index a2cd53e775..65823da004 100644 --- a/spec/ysh-stdlib-synch.test.sh +++ b/spec/ysh-stdlib-sync.test.sh @@ -1,9 +1,9 @@ -# spec/ysh-stdlib +# spec/ysh-stdlib-sync ## our_shell: ysh #### fifo pipe double closes -source --builtin draft-synch.ysh +source --builtin draft-sync.ysh fifo-fd-new (&fd) try { @@ -17,7 +17,7 @@ fifo-fd-destroy (fd) ## END #### semaphore syncrhonizing async jobs -source --builtin draft-synch.ysh +source --builtin draft-sync.ysh sema-new (1, &s) fork { @@ -51,7 +51,7 @@ sema-destroy (s) ## END #### semaphore init with 3, async up once and multiple down -source --builtin draft-synch.ysh +source --builtin draft-sync.ysh sema-new (3, &s) fork { @@ -68,7 +68,7 @@ yes ## END #### channel reads and writes -source --builtin draft-synch.ysh +source --builtin draft-sync.ysh channel-new (&ch) @@ -93,7 +93,7 @@ channel-destroy (ch) ## END #### RWLock multiple shared lock and free, one exclusive lock -source --builtin draft-synch.ysh +source --builtin draft-sync.ysh rw-lock-new (&lk) @@ -126,7 +126,7 @@ rw-lock-destroy (lk) ## END #### Reading and writing rw-lock -source --builtin draft-synch.ysh +source --builtin draft-sync.ysh rw-lock-new (&l) fork { @@ -155,7 +155,7 @@ wwwywww ## END #### Produce many value and exhaust the channel, and then reuse it -source --builtin draft-synch.ysh +source --builtin draft-sync.ysh exh-channel-new (&ch) diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-sync.ysh similarity index 97% rename from stdlib/draft-synch.ysh rename to stdlib/draft-sync.ysh index 5cd57565bb..46033e630e 100644 --- a/stdlib/draft-synch.ysh +++ b/stdlib/draft-sync.ysh @@ -1,7 +1,7 @@ #!/usr/bin/env ysh # vim:foldmethod=marker -module stdlib/synch || return 0 +module stdlib/sync || return 0 source --builtin draft-descriptor.ysh @@ -208,7 +208,7 @@ proc exh-channel-new(; out_chan) { } # TODO: when lambda landed, this can be refactor to a parse chained to an inc. -func __synch_exh_channel_inc_untyped (buf) { +func __sync_exh_channel_inc_untyped (buf) { var typed = buf => int() var swapped = typed + 1 var untyped = "$[swapped]" @@ -218,7 +218,7 @@ func __synch_exh_channel_inc_untyped (buf) { proc exh-channel-in(; chan) { var msg = $(cat) # consume everything from input var msg_len = len(msg) - rw-lock-swap-fn (chan.message_count, __synch_exh_channel_inc_untyped) + rw-lock-swap-fn (chan.message_count, __sync_exh_channel_inc_untyped) rw-lock-shared (chan.will_write) mutex-acquire (chan.write_lock) write -n $msg_len$ZERO_CHAR$msg >&$[chan.pipe] diff --git a/test/spec.sh b/test/spec.sh index be2d43b9b9..104d5111fd 100755 --- a/test/spec.sh +++ b/test/spec.sh @@ -692,8 +692,8 @@ ysh-stdlib-testing() { run-file ysh-stdlib-testing "$@" } -ysh-stdlib-synch() { - run-file ysh-stdlib-synch "$@" +ysh-stdlib-sync() { + run-file ysh-stdlib-sync "$@" } ysh-source() { From fcd78e7dd78321750ad899ae6804b13b9aeba41d Mon Sep 17 00:00:00 2001 From: glyh Date: Tue, 9 Apr 2024 14:03:19 +0800 Subject: [PATCH 45/52] factor out stdlib/pipe --- stdlib/draft-pipe.ysh | 36 ++++++++++++++++++++++++++++++++++++ stdlib/draft-sync.ysh | 37 ++++++++++++++++--------------------- 2 files changed, 52 insertions(+), 21 deletions(-) create mode 100644 stdlib/draft-pipe.ysh diff --git a/stdlib/draft-pipe.ysh b/stdlib/draft-pipe.ysh new file mode 100644 index 0000000000..e8d0d29851 --- /dev/null +++ b/stdlib/draft-pipe.ysh @@ -0,0 +1,36 @@ +module stdlib/pipe || return 0 + +# NOTE: I would love to optimize this a bit more, for example netstring of size n +# now takes log_10(n) over head. We can certainly do this better by byte encoding +# It would be log_128(n) (using 1 bit for indicating the number ends) +# That's a ln 128 / ln 10 which is roughly twice less overhead. +proc netstring-pipe-in () { + var msg = $(cat) # consume everything from input + write -n $[len(msg)]${ZERO_CHAR}${msg} +} + +proc netstring-pipe-out() { + var msg_len = "" + while (true) { + read -n 1 next_char + if (next_char !== ZERO_CHAR) { + setvar msg_len = msg_len ++ next_char + } else { + break + } + } + read -n $msg_len chunk + write -n $chunk +} + +# NOTE: No way to refer to procs, I wrap them around funcs for now. +func __pipe_netstring_pipe_in_wrap() { + netstring-pipe-in +} +func __pipe_netstring_pipe_out_wrap() { + netstring-pipe-out +} +const Pipe_Methods_NetString = { + in_pipe: __pipe_netstring_pipe_in_wrap, + out_pipe: __pipe_netstring_pipe_out_wrap, +} diff --git a/stdlib/draft-sync.ysh b/stdlib/draft-sync.ysh index 46033e630e..573a0efc98 100644 --- a/stdlib/draft-sync.ysh +++ b/stdlib/draft-sync.ysh @@ -4,6 +4,7 @@ module stdlib/sync || return 0 source --builtin draft-descriptor.ysh +source --builtin draft-pipe.ysh # Semaphores {{{ @@ -58,7 +59,9 @@ proc mutex-destroy(; mutex) { const ZERO_CHAR = u'\u{0}' -proc channel-new(; out_chan) { +proc channel-new(;out_chan, pipe_in = __pipe_netstring_pipe_in_wrap, pipe_out = __pipe_netstring_pipe_out_wrap, ) { +# NOTE: Wait for dicts to be able to be set for default fields +# proc channel-new(;out_chan, methods = Pipe_Methods_NetString) { mutex-new (&write_lock) mutex-new (&read_lock) fifo-fd-new (&pipe) @@ -66,35 +69,27 @@ proc channel-new(; out_chan) { write_lock, read_lock, pipe, + methods: { + in_pipe: pipe_in, + out_pipe: pipe_out, + }, }) } -# NOTE: I would love to optimize this a bit more, for example netstring of size n -# now takes log_10(n) over head. We can certainly do this better by byte encoding -# It would be log_128(n) (using 1 bit for indicating the number ends) -# That's a ln 128 / ln 10 which is roughly twice less overhead. proc channel-in(; chan) { - var msg = $(cat) # consume everything from input - var msg_len = len(msg) mutex-acquire (chan.write_lock) - write -n ${msg_len}${ZERO_CHAR}${msg} >&$[chan.pipe] + { + call chan.methods.in_pipe() + } >&$[chan.pipe] mutex-release (chan.write_lock) } proc channel-out(; chan) { - var msg_len = "" - mutex-acquire (chan.read_lock) - while (true) { - read -n 1 next_char <&$[chan.pipe] - if (next_char !== ZERO_CHAR) { - setvar msg_len = msg_len ++ next_char - } else { - break - } - } - read -n $msg_len chunk <&$[chan.pipe] - write -n $chunk - mutex-release (chan.read_lock) + mutex-acquire (chan.write_lock) + { + call chan.methods.out_pipe() + } <&$[chan.pipe] + mutex-release (chan.write_lock) } proc channel-destroy(; chan) { From c11fa8ddd08bcb350913b644e781ced1714d2ea0 Mon Sep 17 00:00:00 2001 From: glyh Date: Tue, 9 Apr 2024 14:38:37 +0800 Subject: [PATCH 46/52] also factors pipemethods outof exhaustable channel --- stdlib/draft-sync.ysh | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/stdlib/draft-sync.ysh b/stdlib/draft-sync.ysh index 573a0efc98..7e455a26d4 100644 --- a/stdlib/draft-sync.ysh +++ b/stdlib/draft-sync.ysh @@ -182,7 +182,7 @@ proc rw-lock-swap-fn(; lock, fn) { # Exhaustable Channels {{{ # Channels but exhaustable -proc exh-channel-new(; out_chan) { +proc exh-channel-new(; out_chan, pipe_in = __pipe_netstring_pipe_in_wrap, pipe_out = __pipe_netstring_pipe_out_wrap) { mutex-new (&write_lock) mutex-new (&read_lock) fifo-fd-new (&pipe) @@ -197,6 +197,10 @@ proc exh-channel-new(; out_chan) { write_lock, read_lock, pipe, + methods: { + in_pipe: pipe_in, + out_pipe: pipe_out, + }, message_count, will_write, }) @@ -211,28 +215,20 @@ func __sync_exh_channel_inc_untyped (buf) { } proc exh-channel-in(; chan) { - var msg = $(cat) # consume everything from input - var msg_len = len(msg) rw-lock-swap-fn (chan.message_count, __sync_exh_channel_inc_untyped) rw-lock-shared (chan.will_write) mutex-acquire (chan.write_lock) - write -n $msg_len$ZERO_CHAR$msg >&$[chan.pipe] + { + call chan.methods.in_pipe() + } >&$[chan.pipe] mutex-release (chan.write_lock) rw-unlock (chan.will_write) } proc __exh_channel_out_no_lock(; chan) { - var msg_len = "" - while (true) { - read -n 1 next_char <&$[chan.pipe] - if (next_char !== ZERO_CHAR) { - setvar msg_len = msg_len ++ next_char - } else { - break - } - } - read -n $msg_len chunk <&$[chan.pipe] - write -n $chunk + { + call chan.methods.out_pipe() + } <&$[chan.pipe] } proc exh-channel-out(; chan) { From e29fe236bbfa4a73bd58b9b239533c7c3c1edfe1 Mon Sep 17 00:00:00 2001 From: glyh Date: Tue, 9 Apr 2024 14:41:08 +0800 Subject: [PATCH 47/52] add shebang and markers in stdlib/pipe --- stdlib/draft-pipe.ysh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/stdlib/draft-pipe.ysh b/stdlib/draft-pipe.ysh index e8d0d29851..00c16621e5 100644 --- a/stdlib/draft-pipe.ysh +++ b/stdlib/draft-pipe.ysh @@ -1,5 +1,8 @@ +#!/usr/bin/env ysh +# vim:foldmethod=marker module stdlib/pipe || return 0 +# Netstring {{{ # NOTE: I would love to optimize this a bit more, for example netstring of size n # now takes log_10(n) over head. We can certainly do this better by byte encoding # It would be log_128(n) (using 1 bit for indicating the number ends) @@ -22,7 +25,6 @@ proc netstring-pipe-out() { read -n $msg_len chunk write -n $chunk } - # NOTE: No way to refer to procs, I wrap them around funcs for now. func __pipe_netstring_pipe_in_wrap() { netstring-pipe-in @@ -30,6 +32,8 @@ func __pipe_netstring_pipe_in_wrap() { func __pipe_netstring_pipe_out_wrap() { netstring-pipe-out } +# }}} + const Pipe_Methods_NetString = { in_pipe: __pipe_netstring_pipe_in_wrap, out_pipe: __pipe_netstring_pipe_out_wrap, From 05c898b3eb35c739f16bf3e37f3165adc7837d47 Mon Sep 17 00:00:00 2001 From: glyh Date: Tue, 9 Apr 2024 14:54:34 +0800 Subject: [PATCH 48/52] rename RWLock to Atom --- spec/ysh-stdlib-sync.test.sh | 36 ++++++++++---------- stdlib/draft-sync.ysh | 64 ++++++++++++++++++------------------ 2 files changed, 50 insertions(+), 50 deletions(-) diff --git a/spec/ysh-stdlib-sync.test.sh b/spec/ysh-stdlib-sync.test.sh index 65823da004..dab384e82d 100644 --- a/spec/ysh-stdlib-sync.test.sh +++ b/spec/ysh-stdlib-sync.test.sh @@ -95,28 +95,28 @@ channel-destroy (ch) #### RWLock multiple shared lock and free, one exclusive lock source --builtin draft-sync.ysh -rw-lock-new (&lk) +atom-new (&lk) fork { - rw-lock-shared (lk) + atom-lock-shared (lk) echo 1 sleep 0.3 - rw-unlock (lk) + atom-unlock (lk) } for _ in (0..3) { fork { sleep 0.1 - rw-lock-shared (lk) + atom-lock-shared (lk) echo 2 sleep 0.2 - rw-unlock (lk) + atom-unlock (lk) } } sleep 0.1 -rw-lock-exclusive (lk) +atom-lock-exclusive (lk) echo 3 -rw-unlock (lk) -rw-lock-destroy (lk) +atom-unlock (lk) +atom-destroy (lk) ## STDOUT: 1 2 @@ -125,30 +125,30 @@ rw-lock-destroy (lk) 3 ## END -#### Reading and writing rw-lock +#### Reading and writing atom source --builtin draft-sync.ysh -rw-lock-new (&l) +atom-new (&l) fork { - rw-lock-exclusive (l) - write -n 'w' | rw-lock-write-in (l) - rw-unlock (l) + atom-lock-exclusive (l) + write -n 'w' | atom-write-in (l) + atom-unlock (l) } sleep 0.1 for _ in (0..3) { fork { - rw-lock-shared (l) - rw-lock-read-out (l) + atom-lock-shared (l) + atom-read-out (l) sleep 0.2 - rw-lock-read-out (l) - rw-unlock (l) + atom-read-out (l) + atom-unlock (l) } } sleep 0.1 write -n y wait -rw-lock-destroy (l) +atom-destroy (l) write ## STDOUT: wwwywww diff --git a/stdlib/draft-sync.ysh b/stdlib/draft-sync.ysh index 7e455a26d4..182e9e37b3 100644 --- a/stdlib/draft-sync.ysh +++ b/stdlib/draft-sync.ysh @@ -103,9 +103,9 @@ proc channel-destroy(; chan) { } # }}} -# RWLock {{{ +# Atom {{{ -proc rw-lock-new(; out_lock) { +proc atom-new(; out_lock) { var lockfile = $(mktemp) var lock = { fd: null, @@ -114,7 +114,7 @@ proc rw-lock-new(; out_lock) { call out_lock->setValue(lock) } -proc rw-lock-read-out(; lock) { +proc atom-read-out(; lock) { if (isFdRead(lock.fd)) { # TODO: ensure lock is held with correct permission cat $[lock.lockfile] @@ -124,7 +124,7 @@ proc rw-lock-read-out(; lock) { } } -proc rw-lock-write-in(; lock) { +proc atom-write-in(; lock) { if (isFdWrite(lock.fd)) { # TODO: ensure lock is held with correct permission cat > $[lock.lockfile] @@ -134,20 +134,20 @@ proc rw-lock-write-in(; lock) { } } -# NOTE: to change the type of a lock to T, e.g. from shared to exclusive, just call rw-lock-{T} again. -proc rw-lock-shared(; lock) { +# NOTE: to change the type of a lock to T, e.g. from shared to exclusive, just call atom-{T} again. +proc atom-lock-shared(; lock) { exec {lock_fd}<$[lock.lockfile] setvar lock.fd = lock_fd flock -s $[lock.fd] } -proc rw-lock-exclusive(; lock) { +proc atom-lock-exclusive(; lock) { exec {lock_fd}<>$[lock.lockfile] setvar lock.fd = lock_fd flock -x $[lock.fd] } -proc rw-unlock(; lock) { +proc atom-unlock(; lock) { var lock_fd_info try { setvar lock_fd_info = getFdInfo(lock.fd) @@ -162,21 +162,21 @@ proc rw-unlock(; lock) { } } -proc rw-lock-destroy(; lock) { - rw-lock-exclusive (lock) +proc atom-destroy(; lock) { + atom-lock-exclusive (lock) fd-destroy-and-rm-file $[lock.fd] } -proc rw-lock-swap-fn(; lock, fn) { - rw-lock-exclusive (lock) - var swapped = $(rw-lock-read-out (lock)) => fn() - write -n $swapped | rw-lock-write-in (lock) - rw-unlock (lock) +proc atom-swap-fn(; lock, fn) { + atom-lock-exclusive (lock) + var swapped = $(atom-read-out (lock)) => fn() + write -n $swapped | atom-write-in (lock) + atom-unlock (lock) } # TODO: Performs an lock-guarded write on a lock's file # The issue is we need to know when to close the pipe -# proc rw-lock-swap-pipe(; lock) +# proc atom-swap-pipe(; lock) # }}} # Exhaustable Channels {{{ @@ -187,12 +187,12 @@ proc exh-channel-new(; out_chan, pipe_in = __pipe_netstring_pipe_in_wrap, pipe_o mutex-new (&read_lock) fifo-fd-new (&pipe) # a counter on how many information we have on the pipe - rw-lock-new (&message_count) + atom-new (&message_count) # a lock indicating if there's upcoming writes - rw-lock-new (&will_write) - rw-lock-exclusive (message_count) - write -n 0 | rw-lock-write-in (message_count) - rw-unlock (message_count) + atom-new (&will_write) + atom-lock-exclusive (message_count) + write -n 0 | atom-write-in (message_count) + atom-unlock (message_count) call out_chan->setValue({ write_lock, read_lock, @@ -215,14 +215,14 @@ func __sync_exh_channel_inc_untyped (buf) { } proc exh-channel-in(; chan) { - rw-lock-swap-fn (chan.message_count, __sync_exh_channel_inc_untyped) - rw-lock-shared (chan.will_write) + atom-swap-fn (chan.message_count, __sync_exh_channel_inc_untyped) + atom-lock-shared (chan.will_write) mutex-acquire (chan.write_lock) { call chan.methods.in_pipe() } >&$[chan.pipe] mutex-release (chan.write_lock) - rw-unlock (chan.will_write) + atom-unlock (chan.will_write) } proc __exh_channel_out_no_lock(; chan) { @@ -239,12 +239,12 @@ proc exh-channel-out(; chan) { proc exh-channel-exhaust(; chan, out_ret) { # No body should send anything to the channel from now on - rw-lock-exclusive (chan.will_write) + atom-lock-exclusive (chan.will_write) # Count how many message we have - rw-lock-exclusive (chan.message_count) - rw-lock-read-out (chan.message_count) | json8 read (&num_msg) - write -n 0 | rw-lock-write-in (chan.message_count) - rw-unlock (chan.message_count) + atom-lock-exclusive (chan.message_count) + atom-read-out (chan.message_count) | json8 read (&num_msg) + write -n 0 | atom-write-in (chan.message_count) + atom-unlock (chan.message_count) var ret = [] mutex-acquire (chan.read_lock) @@ -252,7 +252,7 @@ proc exh-channel-exhaust(; chan, out_ret) { call ret->append($(__exh_channel_out_no_lock (chan))) } mutex-release (chan.read_lock) - rw-unlock (chan.will_write) + atom-unlock (chan.will_write) call out_ret->setValue(ret) } @@ -262,8 +262,8 @@ proc exh-channel-destroy(; chan) { mutex-acquire (chan.write_lock) # Clean up fifo-fd-destroy (chan.pipe) - rw-lock-destroy (chan.message_count) - rw-lock-destroy (chan.will_write) + atom-destroy (chan.message_count) + atom-destroy (chan.will_write) mutex-destroy (chan.write_lock) mutex-destroy (chan.read_lock) } From c421e44c10307d2581e03b090419fc451cbaf5ce Mon Sep 17 00:00:00 2001 From: glyh Date: Tue, 9 Apr 2024 16:05:29 +0800 Subject: [PATCH 49/52] implement blocked piping --- stdlib/draft-pipe.ysh | 63 ++++++++++++++++++++++++++++++++++++------- stdlib/draft-sync.ysh | 3 --- 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/stdlib/draft-pipe.ysh b/stdlib/draft-pipe.ysh index 00c16621e5..914ae885e9 100644 --- a/stdlib/draft-pipe.ysh +++ b/stdlib/draft-pipe.ysh @@ -1,30 +1,73 @@ #!/usr/bin/env ysh # vim:foldmethod=marker + module stdlib/pipe || return 0 -# Netstring {{{ +const __pipe_NUM_DELIM = u'$' + # NOTE: I would love to optimize this a bit more, for example netstring of size n # now takes log_10(n) over head. We can certainly do this better by byte encoding # It would be log_128(n) (using 1 bit for indicating the number ends) # That's a ln 128 / ln 10 which is roughly twice less overhead. -proc netstring-pipe-in () { - var msg = $(cat) # consume everything from input - write -n $[len(msg)]${ZERO_CHAR}${msg} +proc zero-delim-num-pipe-in (; num) { + write -n $num$__pipe_NUM_DELIM } -proc netstring-pipe-out() { - var msg_len = "" +proc zero-delim-num-pipe-out (; out_num) { + var num_str = "" while (true) { read -n 1 next_char - if (next_char !== ZERO_CHAR) { - setvar msg_len = msg_len ++ next_char + if (next_char !== __pipe_NUM_DELIM) { + setvar num_str = num_str ++ next_char } else { break } } - read -n $msg_len chunk - write -n $chunk + call out_num->setValue(num_str => int()) } + +proc blocked-pipe-in (; block_size) { + while (true) { + read -n $block_size chunk + var last_chunk = len(chunk) !== block_size + if (last_chunk) { + write -n 1 + zero-delim-num-pipe-in (len(last_chunk)) + write -n ${chunk} + break + } else { + write -n 0 + write -n ${chunk} + } + } +} + +proc blocked-pipe-out (; block_size) { + while (true) { + read -n 1 last_chunk + if (last_chunk) { + zero-delim-num-pipe-out (&len_last_chunk) + read -n $len_last_chunk + break + } else { + read -n $block_size chunk + } + } +} + +# Netstring {{{ +proc netstring-pipe-in () { + var msg = $(cat) # consume everything from input + zero-delim-num-pipe-in (len(msg)) + write -n $msg +} + +proc netstring-pipe-out() { + zero-delim-num-pipe-out (&msg_len) + read -n $msg_len msg + write -n $msg +} + # NOTE: No way to refer to procs, I wrap them around funcs for now. func __pipe_netstring_pipe_in_wrap() { netstring-pipe-in diff --git a/stdlib/draft-sync.ysh b/stdlib/draft-sync.ysh index 182e9e37b3..80a77c0fae 100644 --- a/stdlib/draft-sync.ysh +++ b/stdlib/draft-sync.ysh @@ -2,7 +2,6 @@ # vim:foldmethod=marker module stdlib/sync || return 0 - source --builtin draft-descriptor.ysh source --builtin draft-pipe.ysh @@ -57,8 +56,6 @@ proc mutex-destroy(; mutex) { # - Backed by a modified net-string implementation # - We may may modify this to replace with an implementation that allows streaming, however. -const ZERO_CHAR = u'\u{0}' - proc channel-new(;out_chan, pipe_in = __pipe_netstring_pipe_in_wrap, pipe_out = __pipe_netstring_pipe_out_wrap, ) { # NOTE: Wait for dicts to be able to be set for default fields # proc channel-new(;out_chan, methods = Pipe_Methods_NetString) { From c3343292aac0598a88201ba0488e5ea7cc391069 Mon Sep 17 00:00:00 2001 From: glyh Date: Tue, 9 Apr 2024 16:06:21 +0800 Subject: [PATCH 50/52] rename zero-delim-num-* to delim-num-* --- stdlib/draft-pipe.ysh | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/stdlib/draft-pipe.ysh b/stdlib/draft-pipe.ysh index 914ae885e9..2bf03af25f 100644 --- a/stdlib/draft-pipe.ysh +++ b/stdlib/draft-pipe.ysh @@ -9,11 +9,11 @@ const __pipe_NUM_DELIM = u'$' # now takes log_10(n) over head. We can certainly do this better by byte encoding # It would be log_128(n) (using 1 bit for indicating the number ends) # That's a ln 128 / ln 10 which is roughly twice less overhead. -proc zero-delim-num-pipe-in (; num) { +proc delim-num-pipe-in (; num) { write -n $num$__pipe_NUM_DELIM } -proc zero-delim-num-pipe-out (; out_num) { +proc delim-num-pipe-out (; out_num) { var num_str = "" while (true) { read -n 1 next_char @@ -32,7 +32,7 @@ proc blocked-pipe-in (; block_size) { var last_chunk = len(chunk) !== block_size if (last_chunk) { write -n 1 - zero-delim-num-pipe-in (len(last_chunk)) + delim-num-pipe-in (len(last_chunk)) write -n ${chunk} break } else { @@ -46,7 +46,7 @@ proc blocked-pipe-out (; block_size) { while (true) { read -n 1 last_chunk if (last_chunk) { - zero-delim-num-pipe-out (&len_last_chunk) + delim-num-pipe-out (&len_last_chunk) read -n $len_last_chunk break } else { @@ -58,12 +58,12 @@ proc blocked-pipe-out (; block_size) { # Netstring {{{ proc netstring-pipe-in () { var msg = $(cat) # consume everything from input - zero-delim-num-pipe-in (len(msg)) + delim-num-pipe-in (len(msg)) write -n $msg } proc netstring-pipe-out() { - zero-delim-num-pipe-out (&msg_len) + delim-num-pipe-out (&msg_len) read -n $msg_len msg write -n $msg } From eb293833b3b6bd8eb72d48e598e531e9a305b2bc Mon Sep 17 00:00:00 2001 From: glyh Date: Tue, 9 Apr 2024 17:12:18 +0800 Subject: [PATCH 51/52] implement blocked pipe --- spec/ysh-stdlib-sync.test.sh | 25 +++++++++++++++++++++++++ stdlib/draft-pipe.ysh | 33 ++++++++++++++------------------- stdlib/draft-sync.ysh | 6 +++--- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/spec/ysh-stdlib-sync.test.sh b/spec/ysh-stdlib-sync.test.sh index dab384e82d..0d0eba6257 100644 --- a/spec/ysh-stdlib-sync.test.sh +++ b/spec/ysh-stdlib-sync.test.sh @@ -92,6 +92,31 @@ channel-destroy (ch) 24 ## END +#### channel but backed by blocked pipe +source --builtin draft-sync.ysh + +setglobal block_size = 4 +func blockPipeInWrap() { + blocked-netstring-pipe-in (block_size) +} +func blockPipeOutWrap() { + blocked-netstring-pipe-out (block_size) +} + +channel-new (&ch, blockPipeInWrap, blockPipeOutWrap) + +var sent = "I-am-a-pretty-damn-long-string-that-need-to-be-blocked" + +fork { + write -n -- "$sent" | channel-in (ch) +} + +var received = $(channel-out (ch)) +echo $[received === sent] +## STDOUT: +true +## END + #### RWLock multiple shared lock and free, one exclusive lock source --builtin draft-sync.ysh diff --git a/stdlib/draft-pipe.ysh b/stdlib/draft-pipe.ysh index 2bf03af25f..4716bdfedc 100644 --- a/stdlib/draft-pipe.ysh +++ b/stdlib/draft-pipe.ysh @@ -10,7 +10,7 @@ const __pipe_NUM_DELIM = u'$' # It would be log_128(n) (using 1 bit for indicating the number ends) # That's a ln 128 / ln 10 which is roughly twice less overhead. proc delim-num-pipe-in (; num) { - write -n $num$__pipe_NUM_DELIM + write -n -- "$num$__pipe_NUM_DELIM" } proc delim-num-pipe-out (; out_num) { @@ -26,31 +26,26 @@ proc delim-num-pipe-out (; out_num) { call out_num->setValue(num_str => int()) } -proc blocked-pipe-in (; block_size) { +proc blocked-netstring-pipe-in (; block_size) { while (true) { - read -n $block_size chunk - var last_chunk = len(chunk) !== block_size + var chunk + try { + read -n $block_size chunk + } + var last_chunk = (_status !== 0) + write -n -- "$chunk" | netstring-pipe-in if (last_chunk) { - write -n 1 - delim-num-pipe-in (len(last_chunk)) - write -n ${chunk} break - } else { - write -n 0 - write -n ${chunk} } } } -proc blocked-pipe-out (; block_size) { +proc blocked-netstring-pipe-out (; block_size) { while (true) { - read -n 1 last_chunk - if (last_chunk) { - delim-num-pipe-out (&len_last_chunk) - read -n $len_last_chunk + var chunk = $(netstring-pipe-out) + write -n -- "$chunk" + if (len(chunk) !== block_size) { break - } else { - read -n $block_size chunk } } } @@ -59,13 +54,13 @@ proc blocked-pipe-out (; block_size) { proc netstring-pipe-in () { var msg = $(cat) # consume everything from input delim-num-pipe-in (len(msg)) - write -n $msg + write -n -- "$msg" } proc netstring-pipe-out() { delim-num-pipe-out (&msg_len) read -n $msg_len msg - write -n $msg + write -n -- "$msg" } # NOTE: No way to refer to procs, I wrap them around funcs for now. diff --git a/stdlib/draft-sync.ysh b/stdlib/draft-sync.ysh index 80a77c0fae..822e1ef46c 100644 --- a/stdlib/draft-sync.ysh +++ b/stdlib/draft-sync.ysh @@ -167,7 +167,7 @@ proc atom-destroy(; lock) { proc atom-swap-fn(; lock, fn) { atom-lock-exclusive (lock) var swapped = $(atom-read-out (lock)) => fn() - write -n $swapped | atom-write-in (lock) + write -n -- "$swapped" | atom-write-in (lock) atom-unlock (lock) } @@ -188,7 +188,7 @@ proc exh-channel-new(; out_chan, pipe_in = __pipe_netstring_pipe_in_wrap, pipe_o # a lock indicating if there's upcoming writes atom-new (&will_write) atom-lock-exclusive (message_count) - write -n 0 | atom-write-in (message_count) + write -n -- 0 | atom-write-in (message_count) atom-unlock (message_count) call out_chan->setValue({ write_lock, @@ -240,7 +240,7 @@ proc exh-channel-exhaust(; chan, out_ret) { # Count how many message we have atom-lock-exclusive (chan.message_count) atom-read-out (chan.message_count) | json8 read (&num_msg) - write -n 0 | atom-write-in (chan.message_count) + write -n -- 0 | atom-write-in (chan.message_count) atom-unlock (chan.message_count) var ret = [] From b73ac9249166f9606b73d6fb9db49be3055e00af Mon Sep 17 00:00:00 2001 From: glyh Date: Tue, 9 Apr 2024 21:35:32 +0800 Subject: [PATCH 52/52] pass interface as dict --- spec/ysh-stdlib-sync.test.sh | 10 +------ stdlib/draft-pipe.ysh | 58 +++++++++++++++++++++++------------- stdlib/draft-sync.ysh | 20 ++++++------- 3 files changed, 48 insertions(+), 40 deletions(-) diff --git a/spec/ysh-stdlib-sync.test.sh b/spec/ysh-stdlib-sync.test.sh index 0d0eba6257..6be6b61a1a 100644 --- a/spec/ysh-stdlib-sync.test.sh +++ b/spec/ysh-stdlib-sync.test.sh @@ -95,15 +95,7 @@ channel-destroy (ch) #### channel but backed by blocked pipe source --builtin draft-sync.ysh -setglobal block_size = 4 -func blockPipeInWrap() { - blocked-netstring-pipe-in (block_size) -} -func blockPipeOutWrap() { - blocked-netstring-pipe-out (block_size) -} - -channel-new (&ch, blockPipeInWrap, blockPipeOutWrap) +channel-new (&ch, __pipe_methods_blocked_netstring(4)) var sent = "I-am-a-pretty-damn-long-string-that-need-to-be-blocked" diff --git a/stdlib/draft-pipe.ysh b/stdlib/draft-pipe.ysh index 4716bdfedc..88f93338cc 100644 --- a/stdlib/draft-pipe.ysh +++ b/stdlib/draft-pipe.ysh @@ -26,6 +26,33 @@ proc delim-num-pipe-out (; out_num) { call out_num->setValue(num_str => int()) } + +# Netstring {{{ +proc netstring-pipe-in () { + var msg = $(cat) # consume everything from input + delim-num-pipe-in (len(msg)) + write -n -- "$msg" +} + +proc netstring-pipe-out() { + delim-num-pipe-out (&msg_len) + read -n $msg_len msg + write -n -- "$msg" +} + +# NOTE: No way to refer to procs, I wrap them around funcs for now. +func __pipe_netstring_pipe_in_wrap() { + netstring-pipe-in +} +func __pipe_netstring_pipe_out_wrap() { + netstring-pipe-out +} +const Pipe_Methods_NetString = { + in_pipe: __pipe_netstring_pipe_in_wrap, + out_pipe: __pipe_netstring_pipe_out_wrap, +} +# }}} +# Blocked NetString {{{ proc blocked-netstring-pipe-in (; block_size) { while (true) { var chunk @@ -50,29 +77,18 @@ proc blocked-netstring-pipe-out (; block_size) { } } -# Netstring {{{ -proc netstring-pipe-in () { - var msg = $(cat) # consume everything from input - delim-num-pipe-in (len(msg)) - write -n -- "$msg" -} - -proc netstring-pipe-out() { - delim-num-pipe-out (&msg_len) - read -n $msg_len msg - write -n -- "$msg" -} -# NOTE: No way to refer to procs, I wrap them around funcs for now. -func __pipe_netstring_pipe_in_wrap() { - netstring-pipe-in +func __pipe_blocked_netstring_pipe_in_wrap(block_size) { + blocked-netstring-pipe-in (block_size) } -func __pipe_netstring_pipe_out_wrap() { - netstring-pipe-out +func __pipe_blocked_pipe_out_wrap(block_size) { + blocked-netstring-pipe-out (block_size) } -# }}} -const Pipe_Methods_NetString = { - in_pipe: __pipe_netstring_pipe_in_wrap, - out_pipe: __pipe_netstring_pipe_out_wrap, +func __pipe_methods_blocked_netstring(blocksize) { + return ({ + in_pipe: __pipe_netstring_pipe_in_wrap, + out_pipe: __pipe_netstring_pipe_out_wrap, + }) } +# }}} diff --git a/stdlib/draft-sync.ysh b/stdlib/draft-sync.ysh index 822e1ef46c..2123e4f6a5 100644 --- a/stdlib/draft-sync.ysh +++ b/stdlib/draft-sync.ysh @@ -56,9 +56,12 @@ proc mutex-destroy(; mutex) { # - Backed by a modified net-string implementation # - We may may modify this to replace with an implementation that allows streaming, however. -proc channel-new(;out_chan, pipe_in = __pipe_netstring_pipe_in_wrap, pipe_out = __pipe_netstring_pipe_out_wrap, ) { +proc channel-new(;out_chan, methods = null) { # NOTE: Wait for dicts to be able to be set for default fields # proc channel-new(;out_chan, methods = Pipe_Methods_NetString) { + if (methods === null) { + setvar methods = Pipe_Methods_NetString + } mutex-new (&write_lock) mutex-new (&read_lock) fifo-fd-new (&pipe) @@ -66,10 +69,7 @@ proc channel-new(;out_chan, pipe_in = __pipe_netstring_pipe_in_wrap, pipe_out = write_lock, read_lock, pipe, - methods: { - in_pipe: pipe_in, - out_pipe: pipe_out, - }, + methods, }) } @@ -179,7 +179,10 @@ proc atom-swap-fn(; lock, fn) { # Exhaustable Channels {{{ # Channels but exhaustable -proc exh-channel-new(; out_chan, pipe_in = __pipe_netstring_pipe_in_wrap, pipe_out = __pipe_netstring_pipe_out_wrap) { +proc exh-channel-new(; out_chan, methods = null) { + if (methods === null) { + setvar methods = Pipe_Methods_NetString + } mutex-new (&write_lock) mutex-new (&read_lock) fifo-fd-new (&pipe) @@ -194,10 +197,7 @@ proc exh-channel-new(; out_chan, pipe_in = __pipe_netstring_pipe_in_wrap, pipe_o write_lock, read_lock, pipe, - methods: { - in_pipe: pipe_in, - out_pipe: pipe_out, - }, + methods, message_count, will_write, })