diff --git a/quasar/quasar.cabal b/quasar/quasar.cabal index 9e32ce4..6403809 100644 --- a/quasar/quasar.cabal +++ b/quasar/quasar.cabal @@ -99,11 +99,6 @@ library other-modules: Quasar.Resources.Disposer Quasar.Resources.FutureDisposer - -- Legacy observable v1 - Quasar.Observable - Quasar.Observable.ObservableList - Quasar.Observable.ObservableMap - Quasar.Observable.ObservablePriority hs-source-dirs: src diff --git a/quasar/src/Quasar/Observable.hs b/quasar/src/Quasar/Observable.hs deleted file mode 100644 index a74ffc0..0000000 --- a/quasar/src/Quasar/Observable.hs +++ /dev/null @@ -1,483 +0,0 @@ -{-# LANGUAGE CPP #-} -{-# LANGUAGE UndecidableInstances #-} - -module Quasar.Observable ( - -- * Observable core - Observable, - ToObservable(..), - readObservable, - attachObserver, - mapObservable, - deduplicateObservable, - cacheObservable, - IsObservable(..), - observeSTM, - ObserverCallback, - observe, - - observeQ, - observeQ_, - - -- ** Control flow utilities - observeWith, - observeBlocking, - observeAsync, - - -- * ObservableEx - ObservableEx, - IsObservableEx, - toObservableEx, - limitObservableEx, - - -- * ObservableVar - ObservableVar, - newObservableVar, - newObservableVarIO, - writeObservableVar, - modifyObservableVar, - stateObservableVar, - observableVarHasObservers, -) where - -import Control.Applicative -import Control.Monad (MonadPlus) -import Control.Monad.Catch -import Data.Coerce (coerce) -import Data.String (IsString(..)) -import Quasar.Async -import Quasar.Exceptions -import Quasar.Future -import Quasar.MonadQuasar -import Quasar.Prelude -import Quasar.Resources.Disposer -import Quasar.Utils.CallbackRegistry -import Quasar.Utils.Fix - -type ObserverCallback a = a -> STMc NoRetry '[] () - -class ToObservable r a | a -> r where - toObservable :: a -> Observable r - default toObservable :: IsObservable r a => a -> Observable r - toObservable = Observable - -class ToObservable r a => IsObservable r a | a -> r where - {-# MINIMAL attachObserver#, readObservable# #-} - - readObservable# :: a -> STMc NoRetry '[] r - - -- | Register a callback to observe changes. The callback is called when the - -- value changes, but depending on the observable implementation intermediate - -- values may be skipped. - -- - -- The implementation of `attachObserver#` MUST NOT call the callback during - -- registration. - -- - -- The implementation of `attachObserver#` MUST NOT directly or indirectly - -- update an observable during the current STM transaction. Only working with - -- `TVar`s and calling `registerCallback` is guaranteed to be safe. - attachObserver# :: a -> ObserverCallback r -> STMc NoRetry '[] (TDisposer, r) - - mapObservable# :: (r -> r2) -> a -> Observable r2 - mapObservable# f o = Observable (MappedObservable f (toObservable o)) - - cacheObservable# :: a -> STMc NoRetry '[] (Observable r) - cacheObservable# o = Observable <$> newCachedObservable (toObservable o) - -readObservable :: (ToObservable r a, MonadSTMc NoRetry '[] m) => a -> m r -readObservable o = liftSTMc $ readObservable# (toObservable o) - -attachObserver :: (ToObservable r a, MonadSTMc NoRetry '[] m) => a -> ObserverCallback r -> m (TDisposer, r) -attachObserver o callback = liftSTMc $ attachObserver# (toObservable o) callback - -mapObservable :: ToObservable r a => (r -> r2) -> a -> Observable r2 -mapObservable f o = mapObservable# f (toObservable o) - -cacheObservable :: (ToObservable r a, MonadSTMc NoRetry '[] m) => a -> m (Observable r) -cacheObservable o = liftSTMc $ cacheObservable# (toObservable o) - --- | The implementation of `observeSTM` will call the callback during --- registration. -observeSTM :: (ToObservable r a, MonadSTMc NoRetry '[] m) => a -> ObserverCallback r -> m TDisposer -observeSTM observable callback = liftSTMc do - (disposer, initial) <- attachObserver# (toObservable observable) callback - callback initial - pure disposer - - -instance ToObservable (Maybe r) (Future '[] r) - -instance IsObservable (Maybe r) (Future '[] r) where - readObservable# future = orElseNothing @'[] (fromEitherEx <$> readFuture future) - - attachObserver# future callback = do - readOrAttachToFuture future (callback . Just . fromEitherEx) >>= \case - Left disposer -> pure (disposer, Nothing) - Right (RightAbsurdEx value) -> pure (mempty, Just value) - - cacheObservable# future = toObservable <$> cacheFuture# future - -observe - :: (ResourceCollector m, MonadSTMc NoRetry '[] m) - => Observable a - -> (a -> STMc NoRetry '[] ()) -- ^ callback - -> m () -observe observable callback = do - disposer <- observeSTM observable callback - collectResource disposer - -observeQ - :: (MonadQuasar m, MonadSTMc NoRetry '[SomeException] m) - => Observable a - -> (a -> STMc NoRetry '[SomeException] ()) -- ^ callback - -> m TDisposer -observeQ observable callbackFn = do - sink <- askExceptionSink - mfix \disposerFixed -> do - let - wrappedCallback state = callbackFn state `catchAllSTMc` \e -> do - disposeTDisposer disposerFixed - throwToExceptionSink sink e - disposer <- observeSTM observable wrappedCallback - collectResource disposer - pure disposer - -observeQ_ - :: (MonadQuasar m, MonadSTMc NoRetry '[SomeException] m) - => Observable a - -> (a -> STMc NoRetry '[SomeException] ()) -- ^ callback - -> m () -observeQ_ observable callback = void $ observeQ observable callback - - --- | Existential quantification wrapper for the IsObservable type class. -data Observable r = forall a. IsObservable r a => Observable a - -instance ToObservable a (Observable a) where - toObservable = id - -instance IsObservable a (Observable a) where - readObservable# (Observable o) = readObservable# o - attachObserver# (Observable o) = attachObserver# o - mapObservable# f (Observable o) = mapObservable# f o - cacheObservable# (Observable o) = cacheObservable# o - -instance Functor Observable where - fmap f = mapObservable# f - -instance Applicative Observable where - pure value = toObservable (ConstObservable value) - liftA2 fn x y = toObservable $ LiftA2Observable fn x y - -instance Monad Observable where - x >>= f = toObservable $ BindObservable x f - -instance Semigroup a => Semigroup (Observable a) where - x <> y = liftA2 (<>) x y - -instance Monoid a => Monoid (Observable a) where - mempty = pure mempty - -instance IsString a => IsString (Observable a) where - fromString = pure . fromString - - --- | Observe an observable by handling updates on the current thread. --- --- `observeBlocking` will run the handler whenever the observable changes (forever / until an exception is encountered). --- --- The handler is allowed to block. When the value changes while the handler is running the handler will be run again --- after it completes; when the value changes multiple times it will only be executed once (with the latest value). -observeBlocking - :: (MonadQuasar m, MonadIO m, MonadMask m) - => Observable r - -> (r -> m ()) - -> m a -observeBlocking observable handler = do - observeWith observable \fetchNext -> forever do - msg <- atomicallyC fetchNext - handler msg - -observeAsync - :: (MonadQuasar m, MonadIO m) - => Observable r - -> (r -> QuasarIO ()) - -> m (Async a) -observeAsync observable handler = async $ observeBlocking observable handler - - -observeWith - :: (MonadQuasar m, MonadIO m, MonadMask m) - => Observable r - -> (STMc Retry '[] r -> m a) - -> m a -observeWith observable fn = do - var <- liftIO newEmptyTMVarIO - - bracket (aquire var) dispose - \_ -> fn (takeTMVar var) - where - aquire var = quasarAtomicallyC $ observeQ observable \msg -> do - writeTMVar var msg - - --- | Internal control flow exception for `observeWhile` and `observeWhile_`. -data ObserveWhileCompleted = ObserveWhileCompleted - deriving stock (Eq, Show) - - -newtype ConstObservable a = ConstObservable a - -instance ToObservable a (ConstObservable a) - -instance IsObservable a (ConstObservable a) where - attachObserver# (ConstObservable value) _ = pure (mempty, value) - - readObservable# (ConstObservable value) = pure value - - cacheObservable# = pure . toObservable - - -data MappedObservable a = forall b. MappedObservable (b -> a) (Observable b) - -instance ToObservable a (MappedObservable a) - -instance IsObservable a (MappedObservable a) where - attachObserver# (MappedObservable fn observable) callback = fn <<$>> attachObserver# observable (callback . fn) - readObservable# (MappedObservable fn observable) = fn <$> readObservable# observable - mapObservable# f1 (MappedObservable f2 upstream) = toObservable $ MappedObservable (f1 . f2) upstream - - --- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting --- observable updates according to the merge function. --- --- There is no caching involed, every subscriber effectively subscribes to both input observables. -data LiftA2Observable r = forall a b. LiftA2Observable (a -> b -> r) (Observable a) (Observable b) - -instance ToObservable a (LiftA2Observable a) - -instance IsObservable a (LiftA2Observable a) where - attachObserver# (LiftA2Observable fn fx fy) callback = do - mfixExtra \(ixFix, iyFix) -> do - var0 <- newTVar ixFix - var1 <- newTVar iyFix - let callCallback = do - x <- readTVar var0 - y <- readTVar var1 - callback (fn x y) - (dx, ix) <- attachObserver# fx (\update -> writeTVar var0 update >> callCallback) - (dy, iy) <- attachObserver# fy (\update -> writeTVar var1 update >> callCallback) - pure ((dx <> dy, fn ix iy), (ix, iy)) - - readObservable# (LiftA2Observable fn fx fy) = - liftA2 fn (readObservable# fx) (readObservable# fy) - - mapObservable# f1 (LiftA2Observable f2 fx fy) = - toObservable $ LiftA2Observable (\x y -> f1 (f2 x y)) fx fy - - -data BindObservable a = forall b. BindObservable (Observable b) (b -> Observable a) - -instance ToObservable a (BindObservable a) - -instance IsObservable a (BindObservable a) where - attachObserver# (BindObservable fx fn) callback = do - mfixExtra \rightDisposerFix -> do - rightDisposerVar <- newTVar rightDisposerFix - (leftDisposer, ix) <- attachObserver# fx (leftCallback rightDisposerVar) - (rightDisposer, iy) <- attachObserver# (fn ix) callback - - varDisposer <- newTDisposer do - disposeTDisposer =<< swapTVar rightDisposerVar mempty - pure ((leftDisposer <> varDisposer, iy), rightDisposer) - - where - leftCallback rightDisposerVar lmsg = do - disposeTDisposer =<< readTVar rightDisposerVar - rightDisposer <- observeSTM (fn lmsg) callback - writeTVar rightDisposerVar rightDisposer - - readObservable# (BindObservable fx fn) = - readObservable# . fn =<< readObservable# fx - - mapObservable# f (BindObservable fx fn) = - toObservable $ BindObservable fx (f <<$>> fn) - - -data CachedObservable a = CachedObservable (TVar (CacheState a)) - -data CacheState a - = CacheIdle (Observable a) - | CacheAttached (Observable a) TDisposer (CallbackRegistry a) a - -instance ToObservable a (CachedObservable a) - -instance IsObservable a (CachedObservable a) where - readObservable# (CachedObservable var) = do - readTVar var >>= \case - CacheIdle upstream -> readObservable# upstream - CacheAttached _ _ _ value -> pure value - - attachObserver# (CachedObservable var) callback = do - (value, registry) <- readTVar var >>= \case - CacheIdle upstream -> do - registry <- newCallbackRegistryWithEmptyCallback removeCacheListener - (upstreamDisposer, value) <- attachObserver# upstream updateCache - writeTVar var (CacheAttached upstream upstreamDisposer registry value) - pure (value, registry) - CacheAttached _ _ registry value -> - pure (value, registry) - disposer <- registerCallback registry callback - pure (disposer, value) - where - removeCacheListener :: STMc NoRetry '[] () - removeCacheListener = do - readTVar var >>= \case - CacheIdle _ -> unreachableCodePath - CacheAttached upstream upstreamDisposer _ _ -> do - writeTVar var (CacheIdle upstream) - disposeTDisposer upstreamDisposer - updateCache :: a -> STMc NoRetry '[] () - updateCache value = do - readTVar var >>= \case - CacheIdle _ -> unreachableCodePath - CacheAttached upstream upstreamDisposer registry _ -> do - writeTVar var (CacheAttached upstream upstreamDisposer registry value) - callCallbacks registry value - - cacheObservable# f = pure (Observable f) - -newCachedObservable :: Observable a -> STMc NoRetry '[] (CachedObservable a) -newCachedObservable f = CachedObservable <$> newTVar (CacheIdle f) - - -newtype DeduplicatedObservable a = DeduplicatedObservable (Observable a) - -instance Eq a => ToObservable a (DeduplicatedObservable a) - -instance Eq a => IsObservable a (DeduplicatedObservable a) where - readObservable# (DeduplicatedObservable upstream) = readObservable# upstream - attachObserver# (DeduplicatedObservable upstream) callback = - mfixExtra \initialFix -> do - var <- newTVar initialFix - (disposer, initialValue) <- attachObserver# upstream \value -> do - old <- readTVar var - when (old /= value) do - writeTVar var value - callback value - pure ((disposer, initialValue), initialValue) - -deduplicateObservable :: (Eq r, ToObservable r a) => a -> Observable r -deduplicateObservable x = Observable (DeduplicatedObservable (toObservable x)) - - -data ObservableVar a = ObservableVar (TVar a) (CallbackRegistry a) - -instance ToObservable a (ObservableVar a) - -instance IsObservable a (ObservableVar a) where - attachObserver# (ObservableVar var registry) callback = do - disposer <- registerCallback registry callback - value <- readTVar var - pure (disposer, value) - - readObservable# = readObservableVar - - cacheObservable# = pure . toObservable - -newObservableVar :: MonadSTMc NoRetry '[] m => a -> m (ObservableVar a) -newObservableVar x = liftSTMc $ ObservableVar <$> newTVar x <*> newCallbackRegistry - -newObservableVarIO :: MonadIO m => a -> m (ObservableVar a) -newObservableVarIO x = liftIO $ ObservableVar <$> newTVarIO x <*> newCallbackRegistryIO - -writeObservableVar :: MonadSTMc NoRetry '[] m => ObservableVar a -> a -> m () -writeObservableVar (ObservableVar var registry) value = liftSTMc $ do - writeTVar var value - callCallbacks registry value - -readObservableVar :: ObservableVar a -> STMc NoRetry '[] a -readObservableVar (ObservableVar var _) = readTVar var - -modifyObservableVar :: MonadSTMc NoRetry '[] m => ObservableVar a -> (a -> a) -> m () -modifyObservableVar var f = stateObservableVar var (((), ) . f) - -stateObservableVar :: MonadSTMc NoRetry '[] m => ObservableVar a -> (a -> (r, a)) -> m r -stateObservableVar var f = liftSTMc do - oldValue <- readObservableVar var - let (result, newValue) = f oldValue - writeObservableVar var newValue - pure result - -observableVarHasObservers :: MonadSTMc NoRetry '[] m => ObservableVar a -> m Bool -observableVarHasObservers (ObservableVar _ registry) = - callbackRegistryHasCallbacks registry - - --- * ObservableEx - -newtype ObservableEx exceptions a = ObservableEx (Observable (Either (Ex exceptions) a)) - -instance ToObservable (Either (Ex exceptions) a) (ObservableEx exceptions a) where - toObservable (ObservableEx o) = o - -instance Functor (ObservableEx exceptions) where - fmap f (ObservableEx x) = ObservableEx (mapObservable# (fmap f) x) - -instance Applicative (ObservableEx exceptions) where - pure value = ObservableEx $ pure (Right value) - liftA2 fn (ObservableEx x) (ObservableEx y) = - ObservableEx $ liftA2 (liftA2 fn) x y - -instance Monad (ObservableEx exceptions) where - (ObservableEx x) >>= f = ObservableEx $ x >>= \case - (Left ex) -> pure (Left ex) - Right y -> toObservable (f y) - - -instance (Exception e, e :< exceptions) => Throw e (ObservableEx exceptions) where - throwC ex = ObservableEx $ pure (Left (toEx ex)) - -instance MonadThrowEx (ObservableEx exceptions) where - unsafeThrowEx = ObservableEx . pure . Left . unsafeToEx @exceptions - -instance SomeException :< exceptions => MonadThrow (ObservableEx exceptions) where - throwM = throwC . toException - -instance (SomeException :< exceptions, Exception (Ex exceptions)) => MonadCatch (ObservableEx exceptions) where - catch (ObservableEx x) f = ObservableEx $ x >>= \case - left@(Left ex) -> case fromException (toException ex) of - Just matched -> toObservable (f matched) - Nothing -> pure left - Right y -> pure (Right y) - -instance SomeException :< exceptions => MonadFail (ObservableEx exceptions) where - fail = throwM . userError - -instance (SomeException :< exceptions, Exception (Ex exceptions)) => Alternative (ObservableEx exceptions) where - empty = fail "empty" - x <|> y = x `catchAll` const y - -instance (SomeException :< exceptions, Exception (Ex exceptions)) => MonadPlus (ObservableEx exceptions) - -instance Semigroup a => Semigroup (ObservableEx exceptions a) where - x <> y = liftA2 (<>) x y - -instance Monoid a => Monoid (ObservableEx exceptions a) where - mempty = pure mempty - -limitObservableEx :: sub :<< super => ObservableEx sub a -> ObservableEx super a -limitObservableEx (ObservableEx o) = ObservableEx $ coerce <$> o - - -type IsObservableEx exceptions a = IsObservable (Either (Ex exceptions) a) - -toObservableEx :: Observable (Either (Ex exceptions) a) -> ObservableEx exceptions a -toObservableEx = ObservableEx - - --- * Convert Observable to Future - ---observableMatches :: MonadSTMc NoRetry '[] m => (a -> Bool) -> Observable a -> m (Future a) ----- TODO remove monad `m` from signature after reworking the Future to allow callbacks ---observableMatches pred observable = do --- promise <- newPromiseIO diff --git a/quasar/src/Quasar/Observable/ObservableList.hs b/quasar/src/Quasar/Observable/ObservableList.hs deleted file mode 100644 index b01b245..0000000 --- a/quasar/src/Quasar/Observable/ObservableList.hs +++ /dev/null @@ -1,248 +0,0 @@ -module Quasar.Observable.ObservableList ( - -- * ObservableList interface - ToObservableList(..), - attachListDeltaObserver, - IsObservableList(..), - ObservableList, - length, - isEmpty, - - -- ** Deltas - ObservableListDelta(..), - ObservableListOperation(..), - singletonDelta, - packDelta, - - -- * Mutable ObservableListVar container - ObservableListVar, - newObservableListVar, - newObservableListVarIO, - newEmptyObservableListVar, - newEmptyObservableListVarIO, - insert, - delete, - lookup, - lookupDelete, -) where - -import Data.Foldable qualified as Foldable -import Data.Sequence (Seq(..)) -import Data.Sequence qualified as Seq -import Quasar.Observable -import Quasar.Prelude hiding (filter, length) -import Quasar.Resources -import Quasar.Utils.CallbackRegistry - - -class ToObservable (Seq v) a => ToObservableList v a | a -> v where - toObservableList :: a -> ObservableList v - default toObservableList :: IsObservableList v a => a -> ObservableList v - toObservableList = ObservableList - -class ToObservableList v a => IsObservableList v a | a -> v where - observeIsEmpty# :: a -> Observable Bool - - observeLength# :: a -> Observable Int - - -- | Register a listener to observe changes to the whole map. The callback - -- will be invoked with the current state of the map immediately after - -- registering and after that will be invoked for every change to the map. - attachListDeltaObserver# :: a -> (ObservableListDelta v -> STMc NoRetry '[] ()) -> STMc NoRetry '[] (TDisposer, Seq v) - -attachListDeltaObserver :: (ToObservableList v a, MonadSTMc NoRetry '[] m) => a -> (ObservableListDelta v -> STMc NoRetry '[] ()) -> m (TDisposer, Seq v) -attachListDeltaObserver x callback = liftSTMc $ attachListDeltaObserver# (toObservableList x) callback - -length :: ToObservableList v a => a -> Observable Int -length = observeLength# . toObservableList - -isEmpty :: ToObservableList v a => a -> Observable Bool -isEmpty = observeIsEmpty# . toObservableList - --- TODO length, isEmpty - -data ObservableList v = forall a. IsObservableList v a => ObservableList a - -instance ToObservable (Seq v) (ObservableList v) where - toObservable (ObservableList x) = toObservable x - -instance ToObservableList v (ObservableList v) where - toObservableList = id - -instance IsObservableList v (ObservableList v) where - observeIsEmpty# (ObservableList x) = observeIsEmpty# x - observeLength# (ObservableList x) = observeLength# x - attachListDeltaObserver# (ObservableList x) = attachListDeltaObserver# x - -instance Functor ObservableList where - fmap f x = toObservableList (MappedObservableList f x) - - --- | A single operation that can be applied to an `ObservableList`. Part of a --- `ObservableListDelta`. --- --- `Insert` indices are clamped to @[0, length]@. --- --- Applying `Delete` to a non-existing index is a no-op. -data ObservableListOperation v - = Insert Int v - | Delete Int - | DeleteAll - -instance Functor ObservableListOperation where - fmap f (Insert k v) = Insert k (f v) - fmap _ (Delete k) = Delete k - fmap _ DeleteAll = DeleteAll - - --- | A list of operations that is applied atomically to an `ObservableList`. -newtype ObservableListDelta v = ObservableListDelta (Seq (ObservableListOperation v)) - -instance Functor ObservableListDelta where - fmap f (ObservableListDelta ops) = ObservableListDelta (f <<$>> ops) - -instance Semigroup (ObservableListDelta v) where - ObservableListDelta x <> ObservableListDelta y = ObservableListDelta (go x y) - where - go :: Seq (ObservableListOperation v) -> Seq (ObservableListOperation v) -> Seq (ObservableListOperation v) - go _ ys@(DeleteAll :<| _) = ys - go (xs :|> Insert key1 _) (Delete key2 :<| ys) | key1 == key2 = go xs ys - go xs ys = xs <> ys - -instance Monoid (ObservableListDelta v) where - mempty = ObservableListDelta mempty - -singletonDelta :: ObservableListOperation v -> ObservableListDelta v -singletonDelta x = ObservableListDelta (Seq.singleton x) - --- | Pack a sequence of `ObservableListOperation`s into a `ObservableListDelta`. --- --- Removes unnecessary updates (all updates preceding an `DeleteAll` or an --- `Insert` followed by a `Delete` for the same element. -packDelta :: Foldable t => t (ObservableListOperation v) -> ObservableListDelta v -packDelta x = - -- The list is passed through the semigroup instance so duplicate updates are - -- filtered. - mconcat $ singletonDelta <$> toList x - - -data MappedObservableList v = forall a. MappedObservableList (a -> v) (ObservableList a) - -instance ToObservable (Seq v) (MappedObservableList v) where - toObservable (MappedObservableList fn observable) = fn <<$>> toObservable observable - -instance ToObservableList v (MappedObservableList v) - -instance IsObservableList v (MappedObservableList v) where - observeIsEmpty# (MappedObservableList _ observable) = observeIsEmpty# observable - observeLength# (MappedObservableList _ observable) = observeLength# observable - attachListDeltaObserver# (MappedObservableList fn observable) callback = - fmap fn <<$>> attachListDeltaObserver# observable (\update -> callback (fn <$> update)) - - -data ObservableListVar v = ObservableListVar { - content :: TVar (Seq v), - observers :: CallbackRegistry (Seq v), - deltaObservers :: CallbackRegistry (ObservableListDelta v), - keyObservers :: TVar (Seq (CallbackRegistry (Maybe v))) -} - -instance ToObservable (Seq v) (ObservableListVar v) - -instance IsObservable (Seq v) (ObservableListVar v) where - readObservable# ObservableListVar{content} = readTVar content - attachObserver# ObservableListVar{content, observers} callback = do - disposer <- registerCallback observers callback - value <- readTVar content - pure (disposer, value) - -instance ToObservableList v (ObservableListVar v) - -instance IsObservableList v (ObservableListVar v) where - observeIsEmpty# x = deduplicateObservable (Seq.null <$> toObservable x) - observeLength# x = deduplicateObservable (Seq.length <$> toObservable x) - attachListDeltaObserver# ObservableListVar{content, deltaObservers} callback = do - disposer <- registerCallback deltaObservers callback - initial <- readTVar content - pure (disposer, initial) - - -data ObservableListVarIndexObservable v = ObservableListVarIndexObservable Int (ObservableListVar v) - -instance ToObservable (Maybe v) (ObservableListVarIndexObservable v) - -instance IsObservable (Maybe v) (ObservableListVarIndexObservable v) where - attachObserver# (ObservableListVarIndexObservable index ObservableListVar{content, keyObservers}) callback = do - value <- Seq.lookup index <$> readTVar content - registry <- do - ko <- readTVar keyObservers - case Seq.lookup index ko of - Just registry -> pure registry - Nothing -> do - registry <- newCallbackRegistryWithEmptyCallback (modifyTVar keyObservers (Seq.deleteAt index)) - modifyTVar keyObservers (Seq.insertAt index registry) - pure registry - disposer <- registerCallback registry callback - pure (disposer, value) - - readObservable# (ObservableListVarIndexObservable index ObservableListVar{content}) = - Seq.lookup index <$> readTVar content - -newObservableListVar :: MonadSTMc NoRetry '[] m => [v] -> m (ObservableListVar v) -newObservableListVar values = liftSTMc @NoRetry @'[] do - content <- newTVar (Seq.fromList values) - observers <- newCallbackRegistry - deltaObservers <- newCallbackRegistry - keyObservers <- newTVar Seq.empty - pure ObservableListVar {content, observers, deltaObservers, keyObservers} - -newEmptyObservableListVar :: MonadSTMc NoRetry '[] m => m (ObservableListVar v) -newEmptyObservableListVar = newObservableListVar [] - -newObservableListVarIO :: MonadIO m => [v] -> m (ObservableListVar v) -newObservableListVarIO values = liftIO do - content <- newTVarIO (Seq.fromList values) - observers <- newCallbackRegistryIO - deltaObservers <- newCallbackRegistryIO - keyObservers <- newTVarIO Seq.empty - pure ObservableListVar {content, observers, deltaObservers, keyObservers} - -newEmptyObservableListVarIO :: MonadIO m => m (ObservableListVar v) -newEmptyObservableListVarIO = newObservableListVarIO [] - -insert :: forall v m. (MonadSTMc NoRetry '[] m) => Int -> v -> ObservableListVar v -> m () -insert index value ObservableListVar{content, observers, deltaObservers, keyObservers} = liftSTMc @NoRetry @'[] do - initial <- readTVar content - let clampedIndex = min (max index 0) (Foldable.length initial) - state <- stateTVar content (dup . Seq.insertAt clampedIndex value) - callCallbacks observers state - callCallbacks deltaObservers (singletonDelta (Insert clampedIndex value)) - mkr <- Seq.lookup index <$> readTVar keyObservers - forM_ mkr \keyRegistry -> callCallbacks keyRegistry (Just value) - -delete :: forall v m. (MonadSTMc NoRetry '[] m) => Int -> ObservableListVar v -> m () -delete index ObservableListVar{content, observers, deltaObservers, keyObservers} = liftSTMc @NoRetry @'[] do - initial <- readTVar content - when (index >= 0 && index < Foldable.length initial) do - let state = Seq.deleteAt index initial - callCallbacks observers state - callCallbacks deltaObservers (singletonDelta (Delete index)) - mkr <- Seq.lookup index <$> readTVar keyObservers - forM_ mkr \keyRegistry -> callCallbacks keyRegistry Nothing - -lookupDelete :: forall v m. (MonadSTMc NoRetry '[] m) => Int -> ObservableListVar v -> m (Maybe v) -lookupDelete index ObservableListVar{content, observers, deltaObservers, keyObservers} = liftSTMc @NoRetry @'[] do - initial <- readTVar content - if index >= 0 && index < Foldable.length initial - then do - (result, newList) <- stateTVar content \orig -> - let - result = Seq.lookup index orig - newList = Seq.deleteAt index orig - in ((result, newList), newList) - callCallbacks observers newList - callCallbacks deltaObservers (singletonDelta (Delete index)) - mkr <- Seq.lookup index <$> readTVar keyObservers - forM_ mkr \keyRegistry -> callCallbacks keyRegistry Nothing - pure result - else - pure Nothing diff --git a/quasar/src/Quasar/Observable/ObservableMap.hs b/quasar/src/Quasar/Observable/ObservableMap.hs deleted file mode 100644 index 3e43ad1..0000000 --- a/quasar/src/Quasar/Observable/ObservableMap.hs +++ /dev/null @@ -1,451 +0,0 @@ -module Quasar.Observable.ObservableMap ( - -- * ObservableMap interface - ToObservableMap(..), - attachMapDeltaObserver, - IsObservableMap(..), - ObservableMap, - mapWithKey, - lookup, - isEmpty, - length, - values, - items, - - empty, - singleton, - fromList, - - filter, - filterWithKey, - union, - unionWith, - unionWithKey, - - lookupMin, - lookupMax, - - -- ** Deltas - ObservableMapDelta(..), - ObservableMapOperation(..), - singletonDelta, - packDelta, - - -- * Mutable ObservableMapVar container - ObservableMapVar, - newObservableMapVar, - newObservableMapVarIO, - insert, - delete, - lookupDelete, -) where - -import Data.Foldable (find) -import Data.Foldable qualified as Foldable -import Data.Map.Strict (Map) -import Data.Map.Strict qualified as Map -import Data.Sequence (Seq(..)) -import Data.Sequence qualified as Seq -import Quasar.Observable -import Quasar.Observable.ObservableList (ObservableList, IsObservableList, ToObservableList(..), ObservableListDelta(..)) -import Quasar.Observable.ObservableList qualified as ObservableList -import Quasar.Prelude hiding (filter, length, lookup) -import Quasar.Resources.Disposer -import Quasar.Utils.CallbackRegistry -import Quasar.Utils.Fix -import Quasar.Utils.Map qualified as Map -import Data.Functor ((<&>)) - - -class ToObservable (Map k v) a => ToObservableMap k v a where - toObservableMap :: a -> ObservableMap k v - default toObservableMap :: IsObservableMap k v a => a -> ObservableMap k v - toObservableMap = ObservableMap - -class ToObservableMap k v a => IsObservableMap k v a where - observeIsEmpty# :: a -> Observable Bool - - observeLength# :: a -> Observable Int - - observeKey# :: Ord k => k -> a -> Observable (Maybe v) - - -- | Register a listener to observe changes to the whole map. The callback - -- will be invoked with the current state of the map immediately after - -- registering and after that will be invoked for every change to the map. - attachMapDeltaObserver# :: a -> (ObservableMapDelta k v -> STMc NoRetry '[] ()) -> STMc NoRetry '[] (TDisposer, Map k v) - -isEmpty :: ToObservableMap k v a => a -> Observable Bool -isEmpty x = observeIsEmpty# (toObservableMap x) - -length :: ToObservableMap k v a => a -> Observable Int -length x = observeLength# (toObservableMap x) - -lookup :: (ToObservableMap k v a, Ord k) => k -> a -> Observable (Maybe v) -lookup key x = observeKey# key (toObservableMap x) - -attachMapDeltaObserver :: (ToObservableMap k v a, MonadSTMc NoRetry '[] m) => a -> (ObservableMapDelta k v -> STMc NoRetry '[] ()) -> m (TDisposer, Map k v) -attachMapDeltaObserver x callback = liftSTMc $ attachMapDeltaObserver# (toObservableMap x) callback - -data ObservableMap k v = forall a. IsObservableMap k v a => ObservableMap a - -instance ToObservable (Map k v) (ObservableMap k v) where - toObservable (ObservableMap x) = toObservable x - -instance ToObservableMap k v (ObservableMap k v) where - toObservableMap = id - -instance IsObservableMap k v (ObservableMap k v) where - observeIsEmpty# (ObservableMap x) = observeIsEmpty# x - observeLength# (ObservableMap x) = observeLength# x - observeKey# key (ObservableMap x) = observeKey# key x - attachMapDeltaObserver# (ObservableMap x) = attachMapDeltaObserver# x - -instance Functor (ObservableMap k) where - fmap f x = toObservableMap (MappedObservableMap (const f) x) - -instance Ord k => Semigroup (ObservableMap k v) where - (<>) = union - -instance Ord k => Monoid (ObservableMap k v) where - mempty = toObservableMap (ConstObservableMap mempty) - - --- | A single operation that can be applied to an `ObservableMap`. Part of a --- `ObservableMapDelta`. --- --- Applying `Delete` to a non-existing key is a no-op. -data ObservableMapOperation k v = Insert k v | Delete k | DeleteAll - -instance Functor (ObservableMapOperation k) where - fmap f (Insert k v) = Insert k (f v) - fmap _ (Delete k) = Delete k - fmap _ DeleteAll = DeleteAll - - --- | A list of operations that is applied atomically to an `ObservableMap`. -newtype ObservableMapDelta k v = ObservableMapDelta (Seq (ObservableMapOperation k v)) - -instance Functor (ObservableMapDelta k) where - fmap f (ObservableMapDelta ops) = ObservableMapDelta (f <<$>> ops) - -instance Eq k => Semigroup (ObservableMapDelta k v) where - ObservableMapDelta x <> ObservableMapDelta y = ObservableMapDelta (go x y) - where - go :: Seq (ObservableMapOperation k v) -> Seq (ObservableMapOperation k v) -> Seq (ObservableMapOperation k v) - go _ ys@(DeleteAll :<| _) = ys - go (xs :|> Insert key1 _) ys@(Delete key2 :<| _) | key1 == key2 = go xs ys - go (xs :|> Delete key1) ys@(Delete key2 :<| _) | key1 == key2 = go xs ys - go xs ys = xs <> ys - -instance Eq k => Monoid (ObservableMapDelta k v) where - mempty = ObservableMapDelta mempty - -singletonDelta :: ObservableMapOperation k v -> ObservableMapDelta k v -singletonDelta x = ObservableMapDelta (Seq.singleton x) - -packDelta :: (Eq k, Foldable t) => t (ObservableMapOperation k v) -> ObservableMapDelta k v -packDelta x = - -- The list is passed through the semigroup instance so duplicate updates are - -- filtered. - mconcat $ singletonDelta <$> toList x - - -empty :: ObservableMap k v -empty = ObservableMap (ConstObservableMap Map.empty) - -singleton :: k -> v -> ObservableMap k v -singleton k v = ObservableMap (ConstObservableMap (Map.singleton k v)) - -fromList :: Ord k => [(k, v)] -> ObservableMap k v -fromList = ObservableMap . ConstObservableMap . Map.fromList - -newtype ConstObservableMap k v = ConstObservableMap (Map k v) - -instance ToObservable (Map k v) (ConstObservableMap k v) where - toObservable (ConstObservableMap x) = pure x - -instance ToObservableMap k v (ConstObservableMap k v) -instance IsObservableMap k v (ConstObservableMap k v) where - observeIsEmpty# (ConstObservableMap x) = pure (null x) - observeLength# (ConstObservableMap x) = pure (Foldable.length x) - observeKey# key (ConstObservableMap x) = pure (Map.lookup key x) - attachMapDeltaObserver# (ConstObservableMap x) _callback = pure (mempty, x) - - -data MappedObservableMap k v = forall a. MappedObservableMap (k -> a -> v) (ObservableMap k a) - -instance ToObservable (Map k v) (MappedObservableMap k v) where - toObservable (MappedObservableMap fn observable) = Map.mapWithKey fn <$> toObservable observable - -instance ToObservableMap k v (MappedObservableMap k v) - -instance IsObservableMap k v (MappedObservableMap k v) where - observeIsEmpty# (MappedObservableMap _ observable) = observeIsEmpty# observable - observeLength# (MappedObservableMap _ observable) = observeLength# observable - observeKey# key (MappedObservableMap fn observable) = fn key <<$>> observeKey# key observable - attachMapDeltaObserver# (MappedObservableMap fn observable) callback = - Map.mapWithKey fn <<$>> attachMapDeltaObserver# observable \update -> callback (mapDeltaWithKey update) - where - mapDeltaWithKey (ObservableMapDelta ops) = ObservableMapDelta (mapUpdateWithKey <$> ops) - mapUpdateWithKey (Insert k v) = Insert k (fn k v) - mapUpdateWithKey (Delete k) = Delete k - mapUpdateWithKey DeleteAll = DeleteAll - -mapWithKey :: ToObservableMap k v1 a => (k -> v1 -> v2) -> a -> ObservableMap k v2 -mapWithKey f x = ObservableMap (MappedObservableMap f (toObservableMap x)) - - -data ObservableMapVar k v = ObservableMapVar { - content :: TVar (Map k v), - observers :: CallbackRegistry (Map k v), - deltaObservers :: CallbackRegistry (ObservableMapDelta k v), - keyObservers :: TVar (Map k (CallbackRegistry (Maybe v))) -} - -instance ToObservable (Map k v) (ObservableMapVar k v) - -instance IsObservable (Map k v) (ObservableMapVar k v) where - readObservable# ObservableMapVar{content} = readTVar content - attachObserver# ObservableMapVar{content, observers} callback = do - disposer <- registerCallback observers callback - value <- readTVar content - pure (disposer, value) - -instance ToObservableMap k v (ObservableMapVar k v) - -instance IsObservableMap k v (ObservableMapVar k v) where - observeIsEmpty# x = deduplicateObservable (Map.null <$> toObservable x) - observeLength# x = deduplicateObservable (Foldable.length <$> toObservable x) - observeKey# key x = toObservable (ObservableMapVarKeyObservable key x) - attachMapDeltaObserver# ObservableMapVar{content, deltaObservers} callback = do - disposer <- registerCallback deltaObservers callback - initial <- readTVar content - pure (disposer, initial) - - -data ObservableMapVarKeyObservable k v = ObservableMapVarKeyObservable k (ObservableMapVar k v) - -instance Ord k => ToObservable (Maybe v) (ObservableMapVarKeyObservable k v) - -instance Ord k => IsObservable (Maybe v) (ObservableMapVarKeyObservable k v) where - attachObserver# (ObservableMapVarKeyObservable key ObservableMapVar{content, keyObservers}) callback = do - value <- Map.lookup key <$> readTVar content - registry <- do - ko <- readTVar keyObservers - case Map.lookup key ko of - Just registry -> pure registry - Nothing -> do - registry <- newCallbackRegistryWithEmptyCallback (modifyTVar keyObservers (Map.delete key)) - modifyTVar keyObservers (Map.insert key registry) - pure registry - disposer <- registerCallback registry callback - pure (disposer, value) - - readObservable# (ObservableMapVarKeyObservable key ObservableMapVar{content}) = - Map.lookup key <$> readTVar content - -newObservableMapVar :: MonadSTMc NoRetry '[] m => m (ObservableMapVar k v) -newObservableMapVar = liftSTMc @NoRetry @'[] do - content <- newTVar Map.empty - observers <- newCallbackRegistry - deltaObservers <- newCallbackRegistry - keyObservers <- newTVar Map.empty - pure ObservableMapVar {content, observers, deltaObservers, keyObservers} - -newObservableMapVarIO :: MonadIO m => m (ObservableMapVar k v) -newObservableMapVarIO = liftIO do - content <- newTVarIO Map.empty - observers <- newCallbackRegistryIO - deltaObservers <- newCallbackRegistryIO - keyObservers <- newTVarIO Map.empty - pure ObservableMapVar {content, observers, deltaObservers, keyObservers} - -insert :: forall k v m. (Ord k, MonadSTMc NoRetry '[] m) => k -> v -> ObservableMapVar k v -> m () -insert key value ObservableMapVar{content, observers, deltaObservers, keyObservers} = liftSTMc @NoRetry @'[] do - state <- stateTVar content (dup . Map.insert key value) - callCallbacks observers state - callCallbacks deltaObservers (singletonDelta (Insert key value)) - mkr <- Map.lookup key <$> readTVar keyObservers - forM_ mkr \keyRegistry -> callCallbacks keyRegistry (Just value) - -delete :: forall k v m. (Ord k, MonadSTMc NoRetry '[] m) => k -> ObservableMapVar k v -> m () -delete key ObservableMapVar{content, observers, deltaObservers, keyObservers} = liftSTMc @NoRetry @'[] do - state <- stateTVar content (dup . Map.delete key) - callCallbacks observers state - callCallbacks deltaObservers (singletonDelta (Delete key)) - mkr <- Map.lookup key <$> readTVar keyObservers - forM_ mkr \keyRegistry -> callCallbacks keyRegistry Nothing - -lookupDelete :: forall k v m. (Ord k, MonadSTMc NoRetry '[] m) => k -> ObservableMapVar k v -> m (Maybe v) -lookupDelete key ObservableMapVar{content, observers, deltaObservers, keyObservers} = liftSTMc @NoRetry @'[] do - (result, newMap) <- stateTVar content \orig -> - let (result, newMap) = Map.lookupDelete key orig - in ((result, newMap), newMap) - callCallbacks observers newMap - callCallbacks deltaObservers (singletonDelta (Delete key)) - mkr <- Map.lookup key <$> readTVar keyObservers - forM_ mkr \keyRegistry -> callCallbacks keyRegistry Nothing - pure result - -data FilteredObservableMap k v = FilteredObservableMap (k -> v -> Bool) (ObservableMap k v) - -instance ToObservable (Map k v) (FilteredObservableMap k v) where - toObservable (FilteredObservableMap predicate upstream) = - mapObservable (Map.filterWithKey predicate) upstream - -instance ToObservableMap k v (FilteredObservableMap k v) - -instance IsObservableMap k v (FilteredObservableMap k v) where - observeIsEmpty# x = - -- NOTE memory footprint could be improved by only tracking the keys (e.g. an (ObservableSet k)) - deduplicateObservable (Map.null <$> toObservable x) - - observeLength# x = - -- NOTE memory footprint could be improved by only tracking the keys (e.g. an (ObservableSet k)) - deduplicateObservable (Foldable.length <$> toObservable x) - - observeKey# key (FilteredObservableMap predicate upstream) = - find (predicate key) <$> observeKey# key upstream - - attachMapDeltaObserver# (FilteredObservableMap predicate upstream) callback = - Map.filterWithKey predicate <<$>> attachMapDeltaObserver# upstream \delta -> callback (filterDelta delta) - where - filterDelta :: ObservableMapDelta k v -> ObservableMapDelta k v - filterDelta (ObservableMapDelta ops) = ObservableMapDelta (filterOperation <$> ops) - filterOperation :: ObservableMapOperation k v -> ObservableMapOperation k v - filterOperation (Insert key value) = - if predicate key value then Insert key value else Delete key - filterOperation (Delete key) = Delete key - filterOperation DeleteAll = DeleteAll - -filter :: IsObservableMap k v a => (v -> Bool) -> a -> ObservableMap k v -filter predicate = filterWithKey (const predicate) - -filterWithKey :: IsObservableMap k v a => (k -> v -> Bool) -> a -> ObservableMap k v -filterWithKey predicate upstream = - toObservableMap (FilteredObservableMap predicate (toObservableMap upstream)) - - -data ObservableMapValues v = forall k. Ord k => ObservableMapValues (ObservableMap k v) - -instance ToObservable (Seq v) (ObservableMapValues v) where - toObservable (ObservableMapValues x) = mapObservable (Seq.fromList . Map.elems) x - -instance ToObservableList v (ObservableMapValues v) - -instance IsObservableList v (ObservableMapValues v) where - observeIsEmpty# (ObservableMapValues x) = observeIsEmpty# x - - observeLength# (ObservableMapValues x) = observeLength# x - - attachListDeltaObserver# (ObservableMapValues x) callback = do - mfixExtra \initialFixed -> do - var <- newTVar initialFixed - (disposer, initial) <- attachMapDeltaObserver# x \(ObservableMapDelta mapOps) -> do - listOperations <- forM mapOps \case - Insert key value -> do - (m, replaced) <- stateTVar var ((\(b, m) -> ((m, b), m)) . Map.insertCheckReplace key value) - let index = Map.findIndex key m - pure if replaced - then Seq.fromList [ObservableList.Delete index, ObservableList.Insert index value] - else Seq.singleton (ObservableList.Insert index value) - Delete key -> do - m <- readTVar var - let i = Map.lookupIndex key m - writeTVar var (Map.delete key m) - pure case i of - Nothing -> mempty - Just i' -> Seq.singleton (ObservableList.Delete i') - DeleteAll -> do - writeTVar var mempty - pure (Seq.singleton ObservableList.DeleteAll) - callback (ObservableListDelta (fold listOperations)) - pure ((disposer, Seq.fromList (Map.elems initial)), initial) - -values :: (Ord k, IsObservableMap k v a) => a -> ObservableList v -values x = toObservableList (ObservableMapValues (toObservableMap x)) - -items :: (Ord k, IsObservableMap k v a) => a -> ObservableList (k, v) -items x = values $ mapWithKey (,) x - - -data ObservableMapUnion k v = Ord k => ObservableMapUnion (k -> v -> v -> v) (ObservableMap k v) (ObservableMap k v) - -instance ToObservable (Map k v) (ObservableMapUnion k v) where - toObservable (ObservableMapUnion fn x y) = - -- TODO use observableMapToObservable - liftA2 (Map.unionWithKey fn) (toObservable x) (toObservable y) - -instance ToObservableMap k v (ObservableMapUnion k v) - -instance IsObservableMap k v (ObservableMapUnion k v) where - observeIsEmpty# (ObservableMapUnion _fn x y) = - deduplicateObservable (liftA2 (&&) (observeIsEmpty# x) (observeIsEmpty# y)) - - observeLength# x = - -- NOTE memory footprint could be improved by only tracking the keys (e.g. an (ObservableSet k)) - deduplicateObservable (Foldable.length <$> toObservable x) - - observeKey# key (ObservableMapUnion fn x y) = - liftA2 (liftA2 (fn key)) (observeKey# key x) (observeKey# key y) - - attachMapDeltaObserver# (ObservableMapUnion fn ox oy) callback = do - mfixExtra \initialFixed -> do - var <- newTVar initialFixed - (disposerX, initialX) <- attachMapDeltaObserver# ox \(ObservableMapDelta mapOps) -> do - (x, y) <- readTVar var - finalOps <- forM (toList mapOps) \case - Insert key value -> do - writeTVar var (Map.insert key value x, y) - case Map.lookup key y of - Nothing -> pure [Delete key] - Just other -> pure [Insert key other] - Delete key -> do - writeTVar var (Map.delete key x, y) - case Map.lookup key y of - Nothing -> pure [Delete key] - Just other -> pure [Insert key other] - DeleteAll -> do - writeTVar var (mempty, y) - pure $ Map.keys x <&> \key -> - case Map.lookup key y of - Nothing -> Delete key - Just other -> Insert key other - callback (packDelta (join finalOps)) - (disposerY, initialY) <- attachMapDeltaObserver# oy \(ObservableMapDelta mapOps) -> do - (x, y) <- readTVar var - finalOps <- forM (toList mapOps) \case - Insert key value -> do - writeTVar var (x, Map.insert key value y) - case Map.lookup key x of - Nothing -> pure [Delete key] - Just other -> pure [Insert key other] - Delete key -> do - writeTVar var (x, Map.delete key y) - case Map.lookup key x of - Nothing -> pure [Delete key] - Just other -> pure [Insert key other] - DeleteAll -> do - writeTVar var (x, mempty) - pure $ Map.keys y <&> \key -> - case Map.lookup key x of - Nothing -> Delete key - Just other -> Insert key other - callback (packDelta (join finalOps)) - - let initial = Map.unionWithKey fn initialX initialY - pure ((disposerX <> disposerY, initial), (initialX, initialY)) - -unionWithKey :: Ord k => (k -> v -> v -> v) -> ObservableMap k v -> ObservableMap k v -> ObservableMap k v -unionWithKey fn x y = ObservableMap (ObservableMapUnion fn x y) - -unionWith :: Ord k => (v -> v -> v) -> ObservableMap k v -> ObservableMap k v -> ObservableMap k v -unionWith fn = unionWithKey \_ x y -> fn x y - -union :: Ord k => ObservableMap k v -> ObservableMap k v -> ObservableMap k v -union = unionWithKey \_ x _ -> x - -lookupMin :: ObservableMap k v -> Observable (Maybe (k, v)) -lookupMin x = Map.lookupMin <$> toObservable x - -lookupMax :: ObservableMap k v -> Observable (Maybe (k, v)) -lookupMax x = Map.lookupMax <$> toObservable x diff --git a/quasar/src/Quasar/Observable/ObservablePriority.hs b/quasar/src/Quasar/Observable/ObservablePriority.hs deleted file mode 100644 index 5fb0e8b..0000000 --- a/quasar/src/Quasar/Observable/ObservablePriority.hs +++ /dev/null @@ -1,125 +0,0 @@ -module Quasar.Observable.ObservablePriority ( - --ObservablePriority, - --create, - --insertValue, -) where - ---import Control.Concurrent.STM (atomically) ---import Data.HashMap.Strict qualified as HM ---import Data.List (maximumBy) ---import Data.List.NonEmpty (NonEmpty(..), nonEmpty) ---import Data.List.NonEmpty qualified as NonEmpty ---import Data.Ord (comparing) ---import Quasar.Disposable ---import Quasar.Observable ---import Quasar.Prelude --- ---type Entry v = (Unique, v) --- ----- | Mutable data structure that stores values of type "v" with an assiciated priority "p". The `IsObservable` instance can be used to get or observe the value with the highest priority. ---newtype ObservablePriority p v = ObservablePriority (MVar (Internals p v)) --- ---instance IsRetrievable (Maybe v) (ObservablePriority p v) where --- retrieve (ObservablePriority mvar) = liftIO $ pure . getValueFromInternals <$> readMVar mvar --- where --- getValueFromInternals :: Internals p v -> Maybe v --- getValueFromInternals Internals{current=Nothing} = Nothing --- getValueFromInternals Internals{current=Just (_, _, value)} = Just value ---instance IsObservable (Maybe v) (ObservablePriority p v) where --- observe = undefined --- --oldObserve (ObservablePriority mvar) callback = do --- -- key <- newUnique --- -- modifyMVar_ mvar $ \internals@Internals{subscribers} -> do --- -- -- Call listener --- -- callback (pure (currentValue internals)) --- -- pure internals{subscribers = HM.insert key callback subscribers} --- -- newDisposable (unsubscribe key) --- -- where --- -- unsubscribe :: Unique -> IO () --- -- unsubscribe key = modifyMVar_ mvar $ \internals@Internals{subscribers} -> pure internals{subscribers=HM.delete key subscribers} --- ---type PriorityMap p v = HM.HashMap p (NonEmpty (Entry v)) --- ---data Internals p v = Internals { --- priorityMap :: PriorityMap p v, --- current :: Maybe (Unique, p, v), --- subscribers :: HM.HashMap Unique (ObservableCallback (Maybe v)) ---} --- ----- | Create a new `ObservablePriority` data structure. ---create :: MonadIO m => m (ObservablePriority p v) ---create = liftIO do --- ObservablePriority <$> newMVar Internals { --- priorityMap = HM.empty, --- current = Nothing, --- subscribers = HM.empty --- } --- ---currentValue :: Internals k v -> Maybe v ---currentValue Internals{current} = (\(_, _, value) -> value) <$> current --- ----- | Insert a value with an assigned priority into the data structure. If the priority is higher than the current highest priority the value will become the current value (and will be sent to subscribers). Otherwise the value will be stored and will only become the current value when all values with a higher priority and all values with the same priority that have been inserted earlier have been removed. ----- Returns an `Disposable` that can be used to remove the value from the data structure. ---insertValue :: forall p v m. MonadIO m => (Ord p, Hashable p) => ObservablePriority p v -> p -> v -> m Disposable ---insertValue (ObservablePriority mvar) priority value = liftIO $ modifyMVar mvar $ \internals -> do --- key <- newUnique --- newInternals <- insertValue' key internals --- (newInternals,) <$> atomically (newDisposable (removeValue key)) --- where --- insertValue' :: Unique -> Internals p v -> IO (Internals p v) --- insertValue' key internals@Internals{priorityMap, current} --- | hasToUpdateCurrent current = do --- let newInternals = internals{priorityMap=insertEntry priorityMap, current=Just (key, priority, value)} --- notifySubscribers newInternals --- pure newInternals --- | otherwise = pure internals{priorityMap=insertEntry priorityMap} --- where --- insertEntry :: PriorityMap p v -> PriorityMap p v --- insertEntry = HM.alter addToEntryList priority --- addToEntryList :: Maybe (NonEmpty (Entry v)) -> Maybe (NonEmpty (Entry v)) --- addToEntryList Nothing = Just newEntryList --- addToEntryList (Just list) = Just (list <> newEntryList) --- newEntryList :: NonEmpty (Entry v) --- newEntryList = (key, value) :| [] --- --- hasToUpdateCurrent :: (Maybe (Unique, p, v)) -> Bool --- hasToUpdateCurrent Nothing = True --- hasToUpdateCurrent (Just (_, oldPriority, _)) = priority > oldPriority --- --- removeValue :: Unique -> IO () --- removeValue key = modifyMVar_ mvar removeValue' --- where --- removeValue' :: Internals p v -> IO (Internals p v) --- removeValue' internals@Internals{priorityMap, current} = do --- let newInternals = internals{priorityMap = removeEntry priorityMap} --- if hasToUpdateCurrent current --- then updateCurrent newInternals --- else pure newInternals --- --- removeEntry :: PriorityMap p v -> PriorityMap p v --- removeEntry = HM.alter removeEntryFromList priority --- removeEntryFromList :: Maybe (NonEmpty (Entry v)) -> Maybe (NonEmpty (Entry v)) --- removeEntryFromList Nothing = Nothing --- removeEntryFromList (Just list) = nonEmpty $ NonEmpty.filter (\(key', _) -> key' /= key) list --- --- updateCurrent :: Internals p v -> IO (Internals p v) --- updateCurrent internals@Internals{priorityMap} = do --- let newInternals = internals{current = selectCurrent $ HM.toList priorityMap} --- notifySubscribers newInternals --- pure newInternals --- selectCurrent :: [(p, (NonEmpty (Entry v)))] -> Maybe (Unique, p, v) --- selectCurrent [] = Nothing --- selectCurrent list = Just . selectCurrentFromList . maximumBy (comparing fst) $ list --- where --- selectCurrentFromList :: (p, (NonEmpty (Entry v))) -> (Unique, p, v) --- selectCurrentFromList (priority', entryList) = (key', priority', value') --- where --- (key', value') = NonEmpty.head entryList --- --- hasToUpdateCurrent :: (Maybe (Unique, p, v)) -> Bool --- hasToUpdateCurrent Nothing = False --- hasToUpdateCurrent (Just (oldKey, _, _)) = key == oldKey --- --- ---notifySubscribers :: forall p v. Internals p v -> IO () ---notifySubscribers Internals{subscribers, current} = forM_ subscribers (\callback -> callback (pure ((\(_, _, value) -> value) <$> current)))