Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add utility functions similar to those found in stdlib io package #549

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions storage/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ type PeekableStorage interface {
Peek(ctx context.Context, key string) ([]byte, io.Closer, error)
}

// FinalizableStorage is a future-detection interface in which a storage implementation can be finalized,
// indicating any addition reads, writes, or function calls will fale. FinalizableStorage is analogous
// IPLD concept to io.Closer in the stdlib.
type FinalizableStorage interface {
Finalize() error
}

// the following are all hypothetical additional future interfaces (in varying degress of speculativeness):

// FUTURE: an EnumerableStorage API, that lets you list all keys present?
Expand Down
236 changes: 236 additions & 0 deletions storage/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package storage

import (
"context"
"errors"
"fmt"
"io"
"sync"
)

type storageUnit struct {
key string
data []byte
}

// onceError is an object that will only store an error once.
type onceError struct {
sync.Mutex // guards following
err error
}

func (a *onceError) Store(err error) {
a.Lock()
defer a.Unlock()
if a.err != nil {
return
}
a.err = err
}
func (a *onceError) Load() error {
a.Lock()
defer a.Unlock()
return a.err
}

// ErrMismatchedKey is the error used when read and write keys don't match
var ErrMismatchedKey = errors.New("put/get keys do not match")

// A pipeStorage is the shared pipeStorage structure underlying ReadablePipeStorage and WritablePipeStorage.
type pipeStorage struct {
wrMu sync.Mutex // Serializes Write operations
wrCh chan storageUnit
rdCh chan struct{}

once sync.Once // Protects closing done
done chan struct{}
rerr onceError
werr onceError
keysLk sync.RWMutex
keys map[string]struct{}
}

func (p *pipeStorage) has(ctx context.Context, key string) (bool, error) {
select {
case <-p.done:
return false, p.readableFinalizeError()
default:
}
p.keysLk.RLock()
defer p.keysLk.RUnlock()
_, ok := p.keys[key]
return ok, nil
}

func (p *pipeStorage) get(ctx context.Context, key string) ([]byte, error) {
select {
case <-p.done:
return nil, p.readableFinalizeError()
default:
}

select {
case su := <-p.wrCh:
p.rdCh <- struct{}{}
if su.key != key {
return nil, fmt.Errorf("%w: put %s, got %s", ErrMismatchedKey, su.key, key)
}
return su.data, nil
case <-ctx.Done():
return nil, ctx.Err()
case <-p.done:
return nil, p.readableFinalizeError()
}
}

func (p *pipeStorage) finalizeReadable(err error) error {
if err == nil {
err = io.ErrClosedPipe
}
p.rerr.Store(err)
p.once.Do(func() { close(p.done) })
return nil
}

func (p *pipeStorage) put(ctx context.Context, key string, b []byte) error {
select {
case <-p.done:
return p.writableFinalizeError()
default:
p.wrMu.Lock()
defer p.wrMu.Unlock()
}

select {
case p.wrCh <- storageUnit{key, b}:
p.keysLk.Lock()
p.keys[key] = struct{}{}
p.keysLk.Unlock()
<-p.rdCh
return nil
case <-ctx.Done():
return ctx.Err()
case <-p.done:
return p.writableFinalizeError()
}
}

func (p *pipeStorage) finalizeWritable(err error) error {
if err == nil {
err = io.EOF
}
p.werr.Store(err)
p.once.Do(func() { close(p.done) })
return nil
}

// readableFinalizeError is considered internal to the pipe type.
func (p *pipeStorage) readableFinalizeError() error {
rerr := p.rerr.Load()
if werr := p.werr.Load(); rerr == nil && werr != nil {
return werr
}
return io.ErrClosedPipe
}

// writableFinalizeError is considered internal to the pipe type.
func (p *pipeStorage) writableFinalizeError() error {
werr := p.werr.Load()
if rerr := p.rerr.Load(); werr == nil && rerr != nil {
return rerr
}
return io.ErrClosedPipe
}

// A ReadablePipeStorage is the read half of a pipe.
type ReadablePipeStorage struct {
p *pipeStorage
}

// Has implements the Storage interface
func (r *ReadablePipeStorage) Has(ctx context.Context, key string) (bool, error) {
return r.p.has(ctx, key)
}

// Get implements the ReadableStorage interface:
// it reads data from the pipe, blocking until a writer
// arrives or the write end is closed.
// If the write end is closed with an error, that error is
// returned as err; otherwise err is EOF.
func (r *ReadablePipeStorage) Get(ctx context.Context, key string) ([]byte, error) {
return r.p.get(ctx, key)
}

// Finalize closes the reader; subsequent writes to the
// write half of the pipe will return the error ErrClosedPipe.
func (r *ReadablePipeStorage) Finalize() error {
return r.FinalizeWithError(nil)
}

// FinalizeWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error err.
//
// FinalizeWithError never overwrites the previous error if it exists
// and always returns nil.
func (r *ReadablePipeStorage) FinalizeWithError(err error) error {
return r.p.finalizeReadable(err)
}

// A WritablePipeStorage is the write half of a pipe.
type WritablePipeStorage struct {
p *pipeStorage
}

// Has implements the Storage interface
func (w *WritablePipeStorage) Has(ctx context.Context, key string) (bool, error) {
return w.p.has(ctx, key)
}

// Put implements the standard Write interface:
// it writes data to the pipe, blocking until one or more readers
// have consumed all the data or the read end is closed.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is ErrClosedPipe.
func (w *WritablePipeStorage) Put(ctx context.Context, key string, data []byte) error {
return w.p.put(ctx, key, data)
}

// Finalize closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and EOF.
func (w *WritablePipeStorage) Finalize() error {
return w.FinalizeWithError(nil)
}

// FinalizeWithError closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and the error err,
// or EOF if err is nil.
//
// FinalizeWithError never overwrites the previous error if it exists
// and always returns nil.
func (w *WritablePipeStorage) FinalizeWithError(err error) error {
return w.p.finalizeWritable(err)
}

// PipeStorage creates a synchronous in-memory pipe.
// It can be used to connect code expecting an io.Reader
// with code expecting an io.Writer.
//
// Reads and Writes on the pipe are matched one to one
// except when multiple Reads are needed to consume a single Write.
// That is, each Write to the PipeWriter blocks until it has satisfied
// one or more Reads from the PipeReader that fully consume
// the written data.
// The data is copied directly from the Write to the corresponding
// Read (or Reads); there is no internal buffering.
//
// It is safe to call Read and Write in parallel with each other or with Close.
// Parallel calls to Read and parallel calls to Write are also safe:
// the individual calls will be gated sequentially.
func PipeStorage() (*ReadablePipeStorage, *WritablePipeStorage) {
p := &pipeStorage{
wrCh: make(chan storageUnit),
rdCh: make(chan struct{}),
done: make(chan struct{}),
}
return &ReadablePipeStorage{p}, &WritablePipeStorage{p}
}
63 changes: 63 additions & 0 deletions storage/teestorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package storage

import (
"context"
"io"
)

type teeStorage struct {
ReadableStorage
out WritableStorage
}

type teeReadCloser struct {
io.Reader
readCloser io.Closer
key string
writeClose func(string) error
}

func (trc teeReadCloser) Close() error {
err := trc.readCloser.Close()
if err != nil {
return err
}
err = trc.writeClose(trc.key)
if err != nil {
return err
}
return nil
}

func (ts teeStorage) GetStream(ctx context.Context, key string) (io.ReadCloser, error) {
rdr, err := GetStream(ctx, ts.ReadableStorage, key)
if err != nil {
return nil, err
}
writer, committer, err := PutStream(ctx, ts.out)
if err != nil {
return nil, err
}
return teeReadCloser{
Reader: io.TeeReader(rdr, writer),
readCloser: rdr,
writeClose: committer,
key: key,
}, nil
}

func (ts teeStorage) Get(ctx context.Context, key string) ([]byte, error) {
data, err := ts.ReadableStorage.Get(ctx, key)
if err != nil {
return nil, err
}
err = ts.out.Put(ctx, key, data)
return data, err
}

func TeeStorage(in ReadableStorage, out WritableStorage) ReadableStorage {
return teeStorage{
ReadableStorage: in,
out: out,
}
}