diff --git a/util/sync/manager.go b/util/sync/manager.go deleted file mode 100644 index fa8883e4f7..0000000000 --- a/util/sync/manager.go +++ /dev/null @@ -1,89 +0,0 @@ -package sync - -import ( - "time" - - "github.com/pkg/errors" - "go-micro.dev/v5/store" -) - -type operation struct { - deadline time.Time - record *store.Record - operation action - retries int - maxiumum int -} - -// action represents the type of a queued operation. -type action int - -const ( - readOp action = iota + 1 - writeOp - deleteOp - listOp -) - -func (c *syncStore) syncManager() { - tickerAggregator := make(chan struct{ index int }) - for i, ticker := range c.pendingWriteTickers { - go func(index int, c chan struct{ index int }, t *time.Ticker) { - for range t.C { - c <- struct{ index int }{index: index} - } - }(i, tickerAggregator, ticker) - } - for { - select { - case i := <-tickerAggregator: - println(i.index, "ticked") - c.processQueue(i.index) - } - } -} - -func (c *syncStore) processQueue(index int) { - c.Lock() - defer c.Unlock() - q := c.pendingWrites[index] - for i := 0; i < q.Len(); i++ { - r, ok := q.PopFront() - if !ok { - panic(errors.Errorf("retrieved an invalid value from the L%d sync queue", index+1)) - } - ir, ok := r.(*internalRecord) - if !ok { - panic(errors.Errorf("retrieved a non-internal record from the L%d sync queue", index+1)) - } - if !ir.expiresAt.IsZero() && time.Now().After(ir.expiresAt) { - continue - } - nr := &store.Record{ - Key: ir.key, - } - nr.Value = make([]byte, len(ir.value)) - copy(nr.Value, ir.value) - if !ir.expiresAt.IsZero() { - nr.Expiry = time.Until(ir.expiresAt) - } - // Todo = internal queue also has to hold the corresponding store.WriteOptions - if err := c.syncOpts.Stores[index+1].Write(nr); err != nil { - // some error, so queue for retry and bail - q.PushBack(ir) - return - } - } -} - -func intpow(x, y int64) int64 { - result := int64(1) - for 0 != y { - if 0 != (y & 1) { - result *= x - } - y >>= 1 - x *= x - } - return result -} diff --git a/util/sync/options.go b/util/sync/options.go deleted file mode 100644 index c708b9289a..0000000000 --- a/util/sync/options.go +++ /dev/null @@ -1,42 +0,0 @@ -package sync - -import ( - "time" - - "go-micro.dev/v5/store" -) - -// Options represents Sync options. -type Options struct { - // Stores represents layers in the sync in ascending order. L0, L1, L2, etc - Stores []store.Store - // SyncInterval is the duration between syncs from L0 to L1 - SyncInterval time.Duration - // SyncMultiplier is the multiplication factor between each store. - SyncMultiplier int64 -} - -// Option sets Sync Options. -type Option func(o *Options) - -// Stores sets the layers that make up the sync. -func Stores(stores ...store.Store) Option { - return func(o *Options) { - o.Stores = make([]store.Store, len(stores)) - copy(o.Stores, stores) - } -} - -// SyncInterval sets the duration between syncs from L0 to L1. -func SyncInterval(d time.Duration) Option { - return func(o *Options) { - o.SyncInterval = d - } -} - -// SyncMultiplier sets the multiplication factor for time to wait each sync layer. -func SyncMultiplier(i int64) Option { - return func(o *Options) { - o.SyncMultiplier = i - } -} diff --git a/util/sync/sync.go b/util/sync/sync.go deleted file mode 100644 index 648edd7171..0000000000 --- a/util/sync/sync.go +++ /dev/null @@ -1,114 +0,0 @@ -// Package syncs will sync multiple stores -package sync - -import ( - "fmt" - "sync" - "time" - - "github.com/ef-ds/deque" - "github.com/pkg/errors" - "go-micro.dev/v5/store" -) - -// Sync implements a sync in for stores. -type Sync interface { - // Implements the store interface - store.Store - // Force a full sync - Sync() error -} - -type syncStore struct { - storeOpts store.Options - pendingWrites []*deque.Deque - pendingWriteTickers []*time.Ticker - syncOpts Options - sync.RWMutex -} - -// NewSync returns a new Sync. -func NewSync(opts ...Option) Sync { - c := &syncStore{} - for _, o := range opts { - o(&c.syncOpts) - } - if c.syncOpts.SyncInterval == 0 { - c.syncOpts.SyncInterval = 1 * time.Minute - } - if c.syncOpts.SyncMultiplier == 0 { - c.syncOpts.SyncMultiplier = 5 - } - return c -} - -func (c *syncStore) Close() error { - return nil -} - -func (c *syncStore) Init(opts ...store.Option) error { - for _, o := range opts { - o(&c.storeOpts) - } - if len(c.syncOpts.Stores) == 0 { - return errors.New("the sync has no stores") - } - if c.storeOpts.Context == nil { - return errors.New("please provide a context to the sync. Canceling the context signals that the sync is being disposed and syncs the sync") - } - for _, s := range c.syncOpts.Stores { - if err := s.Init(); err != nil { - return errors.Wrapf(err, "Store %s failed to Init()", s.String()) - } - } - c.pendingWrites = make([]*deque.Deque, len(c.syncOpts.Stores)-1) - c.pendingWriteTickers = make([]*time.Ticker, len(c.syncOpts.Stores)-1) - for i := 0; i < len(c.pendingWrites); i++ { - c.pendingWrites[i] = deque.New() - c.pendingWrites[i].Init() - c.pendingWriteTickers[i] = time.NewTicker(c.syncOpts.SyncInterval * time.Duration(intpow(c.syncOpts.SyncMultiplier, int64(i)))) - } - go c.syncManager() - return nil -} - -// Options returns the sync's store options. -func (c *syncStore) Options() store.Options { - return c.storeOpts -} - -// String returns a printable string describing the sync. -func (c *syncStore) String() string { - backends := make([]string, len(c.syncOpts.Stores)) - for i, s := range c.syncOpts.Stores { - backends[i] = s.String() - } - return fmt.Sprintf("sync %v", backends) -} - -func (c *syncStore) List(opts ...store.ListOption) ([]string, error) { - return c.syncOpts.Stores[0].List(opts...) -} - -func (c *syncStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { - return c.syncOpts.Stores[0].Read(key, opts...) -} - -func (c *syncStore) Write(r *store.Record, opts ...store.WriteOption) error { - return c.syncOpts.Stores[0].Write(r, opts...) -} - -// Delete removes a key from the sync. -func (c *syncStore) Delete(key string, opts ...store.DeleteOption) error { - return c.syncOpts.Stores[0].Delete(key, opts...) -} - -func (c *syncStore) Sync() error { - return nil -} - -type internalRecord struct { - expiresAt time.Time - key string - value []byte -}