diff --git a/quasar/quasar.cabal b/quasar/quasar.cabal index b231f6a..a251935 100644 --- a/quasar/quasar.cabal +++ b/quasar/quasar.cabal @@ -75,7 +75,6 @@ library Quasar.MonadQuasar.Misc Quasar.Observable.AccumulatingObserver Quasar.Observable.Async - Quasar.Observable.Cache Quasar.Observable.Core Quasar.Observable.Lift Quasar.Observable.List @@ -83,6 +82,7 @@ library Quasar.Observable.Map Quasar.Observable.ObservableVar Quasar.Observable.Set + Quasar.Observable.Share Quasar.Observable.Subject Quasar.Observable.Traversable Quasar.Pool diff --git a/quasar/src/Quasar/Observable/Cache.hs b/quasar/src/Quasar/Observable/Cache.hs deleted file mode 100644 index 7f3508d..0000000 --- a/quasar/src/Quasar/Observable/Cache.hs +++ /dev/null @@ -1,138 +0,0 @@ -{-# LANGUAGE CPP #-} -{-# LANGUAGE UndecidableInstances #-} - -module Quasar.Observable.Cache ( - -- TODO move to Observable module - cacheObservable, - observeCachedObservable, - - cacheObservableT, - - -- ** Observable operation type - CachedObservable, -) where - -import Control.Applicative -import Data.Functor.Identity -import Quasar.Disposer -import Quasar.Observable.Core -import Quasar.Prelude -import Quasar.Utils.CallbackRegistry - --- * Cache - -newtype CachedObservable canLoad exceptions c v = CachedObservable (TVar (CacheState canLoad exceptions c v)) - -data CacheState canLoad exceptions c v - = forall a. IsObservableCore canLoad exceptions c v a => CacheIdle a - | forall a. IsObservableCore canLoad exceptions c v a - => CacheAttached - a - TDisposer - (CallbackRegistry (EvaluatedObservableChange canLoad (ObservableResult exceptions c) v)) - (ObserverState canLoad (ObservableResult exceptions c) v) - -instance (ObservableContainer c v, ContainerConstraint canLoad exceptions c v (CachedObservable canLoad exceptions c v)) => ToObservableT canLoad exceptions c v (CachedObservable canLoad exceptions c v) where - toObservableT = ObservableT - -instance ObservableContainer c v => IsObservableCore canLoad exceptions c v (CachedObservable canLoad exceptions c v) where - readObservable# (CachedObservable var) = do - readTVar var >>= \case - CacheIdle x -> readObservable# x - CacheAttached _x _disposer _registry state -> pure (toObservableState state) - - attachEvaluatedObserver# (CachedObservable var) callback = do - readTVar var >>= \case - CacheIdle upstream -> do - registry <- newCallbackRegistryWithEmptyCallback removeCacheListener - (upstreamDisposer, state) <- attachEvaluatedObserver# upstream updateCache - writeTVar var (CacheAttached upstream upstreamDisposer registry (createObserverState state)) - disposer <- registerCallback registry callback - pure (disposer, state) - CacheAttached _ _ registry state -> do - case state of - -- The cached state can't be propagated downstream, so the first - -- callback invocation must not send a Delta or a LiveUnchanged. - -- The callback is wrapped to change them to a Replace. - ObserverStateLoadingCached cache -> do - disposer <- registerCallbackChangeAfterFirstCall registry (callback . fixInvalidCacheState cache) callback - pure (disposer, ObservableStateLoading) - _ -> do - disposer <- registerCallback registry callback - pure (disposer, toObservableState state) - where - removeCacheListener :: STMc NoRetry '[] () - removeCacheListener = do - readTVar var >>= \case - CacheIdle _ -> unreachableCodePath - CacheAttached upstream upstreamDisposer _ _ -> do - writeTVar var (CacheIdle upstream) - disposeTDisposer upstreamDisposer - updateCache :: EvaluatedObservableChange canLoad (ObservableResult exceptions c) v -> STMc NoRetry '[] () - updateCache change = do - readTVar var >>= \case - CacheIdle _ -> unreachableCodePath - CacheAttached upstream upstreamDisposer registry oldState -> do - let mstate = applyEvaluatedObservableChange change oldState - forM_ mstate \state -> do - writeTVar var (CacheAttached upstream upstreamDisposer registry state) - callCallbacks registry change - - isCachedObservable# _ = True - --- Precondition: Observer is in `ObserverStateLoadingCleared` state, but caller --- assumes the observer is in `ObserverStateLoadingCached` state. -fixInvalidCacheState :: - ObservableResult exceptions c v -> - EvaluatedObservableChange Load (ObservableResult exceptions c) v -> - EvaluatedObservableChange Load (ObservableResult exceptions c) v -fixInvalidCacheState _cached EvaluatedObservableChangeLoadingClear = - EvaluatedObservableChangeLoadingClear -fixInvalidCacheState cached EvaluatedObservableChangeLiveUnchanged = - EvaluatedObservableChangeLiveReplace cached -fixInvalidCacheState _cached replace@(EvaluatedObservableChangeLiveReplace _) = - replace -fixInvalidCacheState _cached (EvaluatedObservableChangeLiveDelta delta) = - EvaluatedObservableChangeLiveReplace (contentFromEvaluatedDelta delta) -fixInvalidCacheState _cached EvaluatedObservableChangeLoadingUnchanged = - -- Filtered by `applyEvaluatedObservableChange` in `updateCache` - impossibleCodePath - -cacheObservable :: - MonadSTMc NoRetry '[] m => - Observable canLoad exceptions v -> m (Observable canLoad exceptions v) -cacheObservable (Observable f) = Observable <$> cacheObservableT f - -cacheObservableT :: - ( - ObservableContainer c v, - ContainerConstraint canLoad exceptions c v (CachedObservable canLoad exceptions c v), - MonadSTMc NoRetry '[] m - ) => - ObservableT canLoad exceptions c v -> m (ObservableT canLoad exceptions c v) -cacheObservableT f = - if isCachedObservable# f - then pure f - else ObservableT . CachedObservable <$> newTVar (CacheIdle f) - - --- ** Embedded cache in the Observable monad - -newtype CacheObservableOperation canLoad exceptions l e v = CacheObservableOperation (Observable l e v) - -instance ToObservableT canLoad exceptions Identity (Observable l e v) (CacheObservableOperation canLoad exceptions l e v) where - toObservableT = ObservableT - -instance IsObservableCore canLoad exceptions Identity (Observable l e v) (CacheObservableOperation canLoad exceptions l e v) where - readObservable# (CacheObservableOperation x) = do - cache <- cacheObservable x - pure (pure cache) - attachObserver# (CacheObservableOperation x) _callback = do - cache <- cacheObservable x - pure (mempty, ObservableStateLive (pure cache)) - --- | Cache an observable in the `Observable` monad. Use with care! A new cache --- is created for every outer observable evaluation. -observeCachedObservable :: forall canLoad exceptions e l v a. ToObservable l e v a => a -> Observable canLoad exceptions (Observable l e v) -observeCachedObservable x = - toObservable (CacheObservableOperation @canLoad @exceptions (toObservable x)) diff --git a/quasar/src/Quasar/Observable/Core.hs b/quasar/src/Quasar/Observable/Core.hs index 40e47f8..5c46dcc 100644 --- a/quasar/src/Quasar/Observable/Core.hs +++ b/quasar/src/Quasar/Observable/Core.hs @@ -167,8 +167,8 @@ class IsObservableCore canLoad exceptions c v a | a -> canLoad, a -> exceptions, pure ((disposer, initial), createObserverState initial) - isCachedObservable# :: a -> Bool - isCachedObservable# _ = False + isSharedObservable# :: a -> Bool + isSharedObservable# _ = False mapObservable# :: ObservableFunctor c => (v -> vb) -> a -> MappedObservable canLoad exceptions c vb mapObservable# f x = MappedObservable f x @@ -220,7 +220,7 @@ instance IsObservableCore canLoad exceptions c v (ObservableT canLoad exceptions readObservable# (ObservableT x) = readObservable# x attachObserver# (ObservableT x) = attachObserver# x attachEvaluatedObserver# (ObservableT x) = attachEvaluatedObserver# x - isCachedObservable# (ObservableT x) = isCachedObservable# x + isSharedObservable# (ObservableT x) = isSharedObservable# x mapObservable# f (ObservableT x) = mapObservable# f x count# (ObservableT x) = count# x isEmpty# (ObservableT x) = isEmpty# x @@ -686,7 +686,7 @@ instance Semigroup (c v) => Semigroup (ObservableState canLoad c v) where instance IsObservableCore canLoad exceptions c v (ObservableState canLoad (ObservableResult exceptions c) v) where readObservable# = pure attachObserver# x _callback = pure (mempty, x) - isCachedObservable# _ = True + isSharedObservable# _ = True count# x = constObservable (mapObservableStateResult (Identity . containerCount#) x) isEmpty# x = constObservable (mapObservableStateResult (Identity . containerIsEmpty#) x) @@ -1632,7 +1632,7 @@ instance IsObservableCore canLoad exceptions Identity v (Observable canLoad exce readObservable# (Observable x) = readObservable# x attachObserver# (Observable x) = attachObserver# x attachEvaluatedObserver# (Observable x) = attachEvaluatedObserver# x - isCachedObservable# (Observable x) = isCachedObservable# x + isSharedObservable# (Observable x) = isSharedObservable# x mapObservable# f (Observable x) = mapObservable# f x count# (Observable x) = count# x isEmpty# (Observable x) = isEmpty# x diff --git a/quasar/src/Quasar/Observable/List.hs b/quasar/src/Quasar/Observable/List.hs index 75b3132..377aec4 100644 --- a/quasar/src/Quasar/Observable/List.hs +++ b/quasar/src/Quasar/Observable/List.hs @@ -12,7 +12,7 @@ module Quasar.Observable.List ( validatedListDeltaLength, ListDeltaOperation(..), Length(..), - cache, + share, -- * Reexports FingerTree, @@ -59,8 +59,8 @@ import Data.Sequence (Seq(Empty)) import Data.Sequence qualified as Seq import Data.Traversable qualified as Traversable import Quasar.Disposer (TDisposer) -import Quasar.Observable.Cache import Quasar.Observable.Core +import Quasar.Observable.Share import Quasar.Observable.Subject import Quasar.Observable.Traversable import Quasar.Prelude hiding (traverse) @@ -342,7 +342,7 @@ instance IsObservableCore canLoad exceptions Seq v (ObservableList canLoad excep readObservable# (ObservableList x) = readObservable# x attachObserver# (ObservableList x) = attachObserver# x attachEvaluatedObserver# (ObservableList x) = attachEvaluatedObserver# x - isCachedObservable# (ObservableList x) = isCachedObservable# x + isSharedObservable# (ObservableList x) = isSharedObservable# x instance IsObservableList canLoad exceptions v (ObservableList canLoad exceptions v) where --member# (ObservableList (ObservableT x)) = member# x @@ -392,13 +392,13 @@ isEmpty :: ObservableList l e v -> Observable l e Bool isEmpty fx = isEmpty# fx -instance IsObservableList canLoad exceptions v (CachedObservable canLoad exceptions Seq v) where +instance IsObservableList canLoad exceptions v (SharedObservable canLoad exceptions Seq v) where -cache :: +share :: MonadSTMc NoRetry '[] m => ObservableList canLoad exceptions v -> m (ObservableList canLoad exceptions v) -cache (ObservableList f) = ObservableList <$> cacheObservableT f +share (ObservableList f) = ObservableList <$> shareObservableT f constObservableList :: ObservableState canLoad (ObservableResult exceptions Seq) v -> ObservableList canLoad exceptions v diff --git a/quasar/src/Quasar/Observable/Map.hs b/quasar/src/Quasar/Observable/Map.hs index aa05cc2..c028835 100644 --- a/quasar/src/Quasar/Observable/Map.hs +++ b/quasar/src/Quasar/Observable/Map.hs @@ -11,7 +11,7 @@ module Quasar.Observable.Map ( liftObservableMap, IsObservableMap(..), query, - cache, + share, -- ** Delta types MapDelta(..), @@ -85,11 +85,11 @@ import Data.Sequence qualified as Seq import Data.Traversable qualified as Traversable import GHC.Records (HasField (..)) import Quasar.Disposer -import Quasar.Observable.Cache import Quasar.Observable.Core import Quasar.Observable.Lift import Quasar.Observable.List (ObservableList(..), IsObservableList, ListOperation(..)) import Quasar.Observable.List qualified as ObservableList +import Quasar.Observable.Share import Quasar.Observable.Subject import Quasar.Observable.Traversable import Quasar.Prelude hiding (filter, lookup, traverse) @@ -206,7 +206,7 @@ instance IsObservableCore canLoad exceptions (Map k) v (ObservableMap canLoad ex readObservable# (ObservableMap (ObservableT x)) = readObservable# x attachObserver# (ObservableMap x) = attachObserver# x attachEvaluatedObserver# (ObservableMap x) = attachEvaluatedObserver# x - isCachedObservable# (ObservableMap (ObservableT x)) = isCachedObservable# x + isSharedObservable# (ObservableMap (ObservableT x)) = isSharedObservable# x instance IsObservableMap canLoad exceptions k v (ObservableMap canLoad exceptions k v) where lookupKey# (ObservableMap x) = lookupKey# x @@ -234,14 +234,14 @@ query x = query# (toObservableMap x) -instance Ord k => IsObservableMap canLoad exceptions k v (CachedObservable canLoad exceptions (Map k) v) where +instance Ord k => IsObservableMap canLoad exceptions k v (SharedObservable canLoad exceptions (Map k) v) where -- TODO -cache :: +share :: (Ord k, MonadSTMc NoRetry '[] m) => ObservableMap canLoad exceptions k v -> m (ObservableMap canLoad exceptions k v) -cache (ObservableMap f) = ObservableMap <$> cacheObservableT f +share (ObservableMap f) = ObservableMap <$> shareObservableT f diff --git a/quasar/src/Quasar/Observable/Set.hs b/quasar/src/Quasar/Observable/Set.hs index 9232555..2b2832b 100644 --- a/quasar/src/Quasar/Observable/Set.hs +++ b/quasar/src/Quasar/Observable/Set.hs @@ -53,7 +53,7 @@ instance Ord v => IsObservableCore canLoad exceptions Set v (ObservableSet canLo readObservable# (ObservableSet x) = readObservable# x attachObserver# (ObservableSet x) = attachObserver# x attachEvaluatedObserver# (ObservableSet x) = attachEvaluatedObserver# x - isCachedObservable# (ObservableSet x) = isCachedObservable# x + isSharedObservable# (ObservableSet x) = isSharedObservable# x instance Ord v => IsObservableSet canLoad exceptions v (ObservableSet canLoad exceptions v) where --member# (ObservableSet x) = member# x diff --git a/quasar/src/Quasar/Observable/Share.hs b/quasar/src/Quasar/Observable/Share.hs new file mode 100644 index 0000000..1780aa8 --- /dev/null +++ b/quasar/src/Quasar/Observable/Share.hs @@ -0,0 +1,138 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE UndecidableInstances #-} + +module Quasar.Observable.Share ( + -- TODO move to Observable module + shareObservable, + observeSharedObservable, + + shareObservableT, + + -- ** Observable operation type + SharedObservable, +) where + +import Control.Applicative +import Data.Functor.Identity +import Quasar.Disposer +import Quasar.Observable.Core +import Quasar.Prelude +import Quasar.Utils.CallbackRegistry + +-- * Share + +newtype SharedObservable canLoad exceptions c v = SharedObservable (TVar (ShareState canLoad exceptions c v)) + +data ShareState canLoad exceptions c v + = forall a. IsObservableCore canLoad exceptions c v a => ShareIdle a + | forall a. IsObservableCore canLoad exceptions c v a + => ShareAttached + a + TDisposer + (CallbackRegistry (EvaluatedObservableChange canLoad (ObservableResult exceptions c) v)) + (ObserverState canLoad (ObservableResult exceptions c) v) + +instance (ObservableContainer c v, ContainerConstraint canLoad exceptions c v (SharedObservable canLoad exceptions c v)) => ToObservableT canLoad exceptions c v (SharedObservable canLoad exceptions c v) where + toObservableT = ObservableT + +instance ObservableContainer c v => IsObservableCore canLoad exceptions c v (SharedObservable canLoad exceptions c v) where + readObservable# (SharedObservable var) = do + readTVar var >>= \case + ShareIdle x -> readObservable# x + ShareAttached _x _disposer _registry state -> pure (toObservableState state) + + attachEvaluatedObserver# (SharedObservable var) callback = do + readTVar var >>= \case + ShareIdle upstream -> do + registry <- newCallbackRegistryWithEmptyCallback removeShareListener + (upstreamDisposer, state) <- attachEvaluatedObserver# upstream updateShare + writeTVar var (ShareAttached upstream upstreamDisposer registry (createObserverState state)) + disposer <- registerCallback registry callback + pure (disposer, state) + ShareAttached _ _ registry state -> do + case state of + -- The shared state can't be propagated downstream, so the first + -- callback invocation must not send a Delta or a LiveUnchanged. + -- The callback is wrapped to change them to a Replace. + ObserverStateLoadingCached cache -> do + disposer <- registerCallbackChangeAfterFirstCall registry (callback . fixInvalidShareState cache) callback + pure (disposer, ObservableStateLoading) + _ -> do + disposer <- registerCallback registry callback + pure (disposer, toObservableState state) + where + removeShareListener :: STMc NoRetry '[] () + removeShareListener = do + readTVar var >>= \case + ShareIdle _ -> unreachableCodePath + ShareAttached upstream upstreamDisposer _ _ -> do + writeTVar var (ShareIdle upstream) + disposeTDisposer upstreamDisposer + updateShare :: EvaluatedObservableChange canLoad (ObservableResult exceptions c) v -> STMc NoRetry '[] () + updateShare change = do + readTVar var >>= \case + ShareIdle _ -> unreachableCodePath + ShareAttached upstream upstreamDisposer registry oldState -> do + let mstate = applyEvaluatedObservableChange change oldState + forM_ mstate \state -> do + writeTVar var (ShareAttached upstream upstreamDisposer registry state) + callCallbacks registry change + + isSharedObservable# _ = True + +-- Precondition: Observer is in `ObserverStateLoadingCleared` state, but caller +-- assumes the observer is in `ObserverStateLoadingShared` state. +fixInvalidShareState :: + ObservableResult exceptions c v -> + EvaluatedObservableChange Load (ObservableResult exceptions c) v -> + EvaluatedObservableChange Load (ObservableResult exceptions c) v +fixInvalidShareState _shared EvaluatedObservableChangeLoadingClear = + EvaluatedObservableChangeLoadingClear +fixInvalidShareState shared EvaluatedObservableChangeLiveUnchanged = + EvaluatedObservableChangeLiveReplace shared +fixInvalidShareState _shared replace@(EvaluatedObservableChangeLiveReplace _) = + replace +fixInvalidShareState _shared (EvaluatedObservableChangeLiveDelta delta) = + EvaluatedObservableChangeLiveReplace (contentFromEvaluatedDelta delta) +fixInvalidShareState _shared EvaluatedObservableChangeLoadingUnchanged = + -- Filtered by `applyEvaluatedObservableChange` in `updateShare` + impossibleCodePath + +shareObservable :: + MonadSTMc NoRetry '[] m => + Observable canLoad exceptions v -> m (Observable canLoad exceptions v) +shareObservable (Observable f) = Observable <$> shareObservableT f + +shareObservableT :: + ( + ObservableContainer c v, + ContainerConstraint canLoad exceptions c v (SharedObservable canLoad exceptions c v), + MonadSTMc NoRetry '[] m + ) => + ObservableT canLoad exceptions c v -> m (ObservableT canLoad exceptions c v) +shareObservableT f = + if isSharedObservable# f + then pure f + else ObservableT . SharedObservable <$> newTVar (ShareIdle f) + + +-- ** Embedded share in the Observable monad + +newtype ShareObservableOperation canLoad exceptions l e v = ShareObservableOperation (Observable l e v) + +instance ToObservableT canLoad exceptions Identity (Observable l e v) (ShareObservableOperation canLoad exceptions l e v) where + toObservableT = ObservableT + +instance IsObservableCore canLoad exceptions Identity (Observable l e v) (ShareObservableOperation canLoad exceptions l e v) where + readObservable# (ShareObservableOperation x) = do + share <- shareObservable x + pure (pure share) + attachObserver# (ShareObservableOperation x) _callback = do + share <- shareObservable x + pure (mempty, ObservableStateLive (pure share)) + +-- | Share an observable in the `Observable` monad. Use with care! A new share +-- is created for every outer observable evaluation. +observeSharedObservable :: forall canLoad exceptions e l v a. ToObservable l e v a => a -> Observable canLoad exceptions (Observable l e v) +observeSharedObservable x = + toObservable (ShareObservableOperation @canLoad @exceptions (toObservable x))