Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement merge for channels #154

Open
dpetranek opened this issue Aug 3, 2024 · 0 comments
Open

implement merge for channels #154

dpetranek opened this issue Aug 3, 2024 · 0 comments

Comments

@dpetranek
Copy link

I've been using and enjoying the csp channels, however I ran a missing function while working with a csp/mult. In clojure.core.async there is a merge function to take n channels and merge the values as they become available onto the given channel, and I think there should be an analogue for promesa.exec.csp.

Here's a rough sketch of how I'm using it:

(def north-xf (map (fn [[x y]] [:north x y])))
(def south-xf (map (fn [[x y]] [:south x y])))

;; this is the source chan, a series of [x y] coordinates
(def coords-ch (csp/chan))
;; create a mult from the source chan
(def mx (csp/mult* coords-ch))
;; tap values from the mult into the respective work chans
(def north (let [ch (csp/chan :buf 1 :xf north-xf)]
               (csp/tap! mx ch)))
(def south (let [ch (csp/chan :buf 1 :xf south-xf)]
               (csp/tap! mx ch)))

;; load up the source chan with coordinates
(csp/onto-chan! coords-ch (partition-all 2 (range 10)))

;; this is what I want to do to consolidate all the work done:
(merge [north south])

I've come up with my own version of merge (taking inspiration from clojure.core.async) that seems to be working fine on the jvm, I haven't had to use it in CLJS yet.

(require '[promesa.exec.csp :as csp])

(defn merge
  [chans & {:keys [buf xf exh exc] :as opts}]
  (let [out (csp/chan opts)]
    (csp/go-loop [cs (vec chans)]
      (if (pos? (count cs))
        (let [[v ch] (csp/alts! cs)]
          (if (nil? v)
            (recur (filterv #(not= ch %) cs))
            (do (csp/put! out v)
                (recur cs))))
        (csp/close! out)))
    out))

I'd be happy to put together a PR for it if this is the direction you want to go.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant