Skip to content

Commit

Permalink
fix: mem-copy for READ benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
harshavardhana committed Jan 10, 2025
1 parent 269798d commit 7d71916
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 117 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/fatih/color v1.18.0
github.com/felixge/fgprof v0.9.5
github.com/google/uuid v1.6.0
github.com/minio/pkg/v3 v3.0.26-0.20250106155027-2becdc33e233
github.com/minio/pkg/v3 v3.0.28
github.com/ncw/directio v1.0.5
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/minio/pkg/v3 v3.0.26-0.20250106155027-2becdc33e233 h1:SR5q/92Xqkj2Zg3O3sPnJvms+/ZMN5W1mSiA3htdnhc=
github.com/minio/pkg/v3 v3.0.26-0.20250106155027-2becdc33e233/go.mod h1:mIaN552nu0D2jiSk5BQC8LB25f44ytbOBJCuLtksX7Q=
github.com/minio/pkg/v3 v3.0.28 h1:8tSuZnJbjc3C3DM2DEh4ZnSWjMZdccd679stk8sPD60=
github.com/minio/pkg/v3 v3.0.28/go.mod h1:mIaN552nu0D2jiSk5BQC8LB25f44ytbOBJCuLtksX7Q=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/ncw/directio v1.0.5 h1:JSUBhdjEvVaJvOoyPAbcW0fnd0tvRXD76wEfZ1KcQz4=
Expand Down
202 changes: 88 additions & 114 deletions pkg/dperf/run_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,70 +31,6 @@ import (
"golang.org/x/sys/unix"
)

// odirectReader - to support O_DIRECT reads for erasure backends.
type odirectReader struct {
fd int
Bufp *[]byte
buf []byte
err error
seenRead bool
alignment bool

ctx context.Context
}

// Read - Implements Reader interface.
func (o *odirectReader) Read(buf []byte) (n int, err error) {
if o.ctx.Err() != nil {
return 0, o.ctx.Err()
}
if o.err != nil && (len(o.buf) == 0 || !o.seenRead) {
return 0, o.err
}
if !o.seenRead {
o.buf = *o.Bufp
n, err = syscall.Read(o.fd, o.buf)
if err != nil && err != io.EOF {
if errors.Is(err, syscall.EINVAL) {
if err = disableDirectIO(uintptr(o.fd)); err != nil {
o.err = err
return n, err
}
n, err = syscall.Read(o.fd, o.buf)
}
if err != nil && err != io.EOF {
o.err = err
return n, err
}
}
if n == 0 {
if err == nil {
err = io.EOF
}
o.err = err
return n, err
}
o.err = err
o.buf = o.buf[:n]
o.seenRead = true
}
if len(buf) >= len(o.buf) {
n = copy(buf, o.buf)
o.seenRead = false
return n, o.err
}
n = copy(buf, o.buf)
o.buf = o.buf[n:]
// There is more left in buffer, do not return any EOF yet.
return n, nil
}

// Close - Release the buffer and close the file.
func (o *odirectReader) Close() error {
o.err = errors.New("internal error: odirectReader Read after Close")
return syscall.Close(o.fd)
}

type nullWriter struct{}

func (n nullWriter) Write(b []byte) (int, error) {
Expand All @@ -103,21 +39,14 @@ func (n nullWriter) Write(b []byte) (int, error) {

func (d *DrivePerf) runReadTest(ctx context.Context, path string, data []byte) (uint64, error) {
startTime := time.Now()
fd, err := syscall.Open(path, syscall.O_DIRECT|syscall.O_RDONLY, 0o400)
r, err := os.OpenFile(path, syscall.O_DIRECT|os.O_RDONLY, 0o400)
if err != nil {
return 0, err
}
unix.Fadvise(fd, 0, int64(d.FileSize), unix.FADV_SEQUENTIAL)
unix.Fadvise(int(r.Fd()), 0, int64(d.FileSize), unix.FADV_SEQUENTIAL)

of := &odirectReader{
fd: fd,
Bufp: &data,
ctx: ctx,
alignment: d.FileSize%4096 == 0,
}

n, err := io.Copy(&nullWriter{}, of)
of.Close()
n, err := copyAligned(&nullWriter{}, r, data, int64(d.FileSize), r.Fd())
r.Close()
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -164,7 +93,7 @@ func (n nullReader) Read(b []byte) (int, error) {
return len(b), nil
}

func newEncReader(ctx context.Context) io.Reader {
func newRandomReader(ctx context.Context) io.Reader {
r, err := rng.NewReader()
if err != nil {
panic(err)
Expand All @@ -183,73 +112,108 @@ func disableDirectIO(fd uintptr) error {
return err
}

func copyAligned(fd int, r io.Reader, alignedBuf []byte, totalSize int64) (written int64, err error) {
defer func() {
ferr := fdatasync(fd)
if ferr != nil {
// preserve error on fdatasync
err = ferr
}
cerr := syscall.Close(fd)
if cerr != nil {
// preserve error on close
err = cerr
}
}()

// Writes remaining bytes in the buffer.
writeUnaligned := func(buf []byte) (remainingWritten int64, err error) {
// Disable O_DIRECT on fd's on unaligned buffer
// perform an amortized Fdatasync(fd) on the fd at
// the end, this is performed by the caller before
// closing 'w'.
if err = disableDirectIO(uintptr(fd)); err != nil {
return remainingWritten, err
}
n, err := syscall.Write(fd, buf)
return int64(n), err
// DirectioAlignSize - DirectIO alignment needs to be 4K. Defined here as
// directio.AlignSize is defined as 0 in MacOS causing divide by 0 error.
const DirectioAlignSize = 4096

// copyAligned - copies from reader to writer using the aligned input
// buffer, it is expected that input buffer is page aligned to
// 4K page boundaries. Without passing aligned buffer may cause
// this function to return error.
//
// This code is similar in spirit to io.Copy but it is only to be
// used with DIRECT I/O based file descriptor and it is expected that
// input writer *os.File not a generic io.Writer. Make sure to have
// the file opened for writes with syscall.O_DIRECT flag.
func copyAligned(w io.Writer, r io.Reader, alignedBuf []byte, totalSize int64, fd uintptr) (int64, error) {
if totalSize == 0 {
return 0, nil
}

var written int64
for {
buf := alignedBuf
if totalSize != -1 {
if totalSize > 0 {
remaining := totalSize - written
if remaining < int64(len(buf)) {
buf = buf[:remaining]
}
}

if len(buf)%DirectioAlignSize != 0 {
// Disable O_DIRECT on fd's on unaligned buffer
// perform an amortized Fdatasync(fd) on the fd at
// the end, this is performed by the caller before
// closing 'w'.
if err := disableDirectIO(fd); err != nil {
return written, err
}
}

nr, err := io.ReadFull(r, buf)
eof := err == io.EOF || err == io.ErrUnexpectedEOF
eof := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)
if err != nil && !eof {
return written, err
}

buf = buf[:nr]
var nw int64
if len(buf)%4096 == 0 {
var n int
var (
n int
un int
nw int64
)

remain := len(buf) % DirectioAlignSize
if remain == 0 {
// buf is aligned for directio write()
n, err = syscall.Write(fd, buf)
n, err = w.Write(buf)
nw = int64(n)
} else {
if remain < len(buf) {
n, err = w.Write(buf[:len(buf)-remain])
if err != nil {
return written, err
}
nw = int64(n)
}

// Disable O_DIRECT on fd's on unaligned buffer
// perform an amortized Fdatasync(fd) on the fd at
// the end, this is performed by the caller before
// closing 'w'.
if err = disableDirectIO(fd); err != nil {
return written, err
}

// buf is not aligned, hence use writeUnaligned()
nw, err = writeUnaligned(buf)
// for the remainder
un, err = w.Write(buf[len(buf)-remain:])
nw += int64(un)
}

if nw > 0 {
written += nw
}

if err != nil {
return written, err
}

if nw != int64(len(buf)) {
return written, io.ErrShortWrite
}

if totalSize != -1 {
if written == totalSize {
return written, nil
}
if totalSize > 0 && written == totalSize {
// we have written the entire stream, return right here.
return written, nil
}

if eof {
// We reached EOF prematurely but we did not write everything
// that we promised that we would write.
if totalSize > 0 && written != totalSize {
return written, io.ErrUnexpectedEOF
}
return written, nil
}
}
Expand All @@ -261,20 +225,30 @@ func (d *DrivePerf) runWriteTest(ctx context.Context, path string, data []byte)
}

startTime := time.Now()
fd, err := syscall.Open(path, syscall.O_DIRECT|syscall.O_RDWR|syscall.O_CREAT|syscall.O_TRUNC, 0o600)
w, err := os.OpenFile(path, syscall.O_DIRECT|os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o600)
if err != nil {
return 0, err
}

n, err := copyAligned(fd, newEncReader(ctx), data, int64(d.FileSize))
n, err := copyAligned(w, newRandomReader(ctx), data, int64(d.FileSize), w.Fd())
if err != nil {
w.Close()
return 0, err
}

if n != int64(d.FileSize) {
w.Close()
return 0, fmt.Errorf("Expected to write %d, wrote %d bytes", d.FileSize, n)
}

if err := fdatasync(int(w.Fd())); err != nil {
return 0, err
}

if err := w.Close(); err != nil {
return 0, err
}

dt := float64(time.Since(startTime))
throughputInSeconds := (float64(d.FileSize) / dt) * float64(time.Second)
return uint64(throughputInSeconds), nil
Expand Down

0 comments on commit 7d71916

Please sign in to comment.