Skip to content

Commit

Permalink
Speed up StreamWriter (hypermodeinc#825)
Browse files Browse the repository at this point in the history
Use a goroutine to split up the serial part of writing in StreamWriter to allow another core to work on the table encoding. On my desktop, this can write at the rate of 200MBps (1.6Gbps), finishing 1B keys (16B keys, 16B values) in around 3m20s.

Changes:
* Use a goroutine to split up the serial part of writing in StreamWriter to speed things up.
* Limit to 3 pending requests at a time.
* Use as many goroutines to process requests as the number of streams.
* Update badger.fill tool to send writes to streamwriter concurrently.
* Do batching based on size instead of count.
* Set the value log head correctly.
  • Loading branch information
manishrjain authored May 29, 2019
1 parent e9447c9 commit 1725096
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 57 deletions.
60 changes: 45 additions & 15 deletions badger/cmd/fill.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/binary"
"log"
"math/rand"
"sync"
"time"

"github.com/dgraph-io/badger"
Expand Down Expand Up @@ -79,27 +80,56 @@ func fillSorted(db *badger.DB, num uint64) error {
if err := writer.Prepare(); err != nil {
return err
}
kvs := &pb.KVList{}
for i := uint64(1); i <= num; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, i)
kvs.Kv = append(kvs.Kv, &pb.KV{
Key: key,
Value: value,
Version: 1,
})
if len(kvs.Kv) > 1000 {
if err := writer.Write(kvs); err != nil {
return err

wg := &sync.WaitGroup{}
writeCh := make(chan *pb.KVList, 3)
writeRange := func(start, end uint64, streamId uint32) {
// end is not included.
defer wg.Done()
kvs := &pb.KVList{}
var sz int
for i := start; i < end; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, i)
kvs.Kv = append(kvs.Kv, &pb.KV{
Key: key,
Value: value,
Version: 1,
StreamId: streamId,
})
sz += len(key) + len(value)
if sz >= 4<<20 { // 4 MB
writeCh <- kvs
kvs = &pb.KVList{}
sz = 0
}
kvs = &pb.KVList{}
}
writeCh <- kvs
}
if len(kvs.Kv) > 0 {

// Let's create some streams.
width := num / 16
streamId := uint32(0)
for start := uint64(0); start < num; start += width {
end := start + width
if end > num {
end = num
}
streamId++
wg.Add(1)
go writeRange(start, end, streamId)
}
go func() {
wg.Wait()
close(writeCh)
}()
log.Printf("Max StreamId used: %d. Width: %d\n", streamId, width)
for kvs := range writeCh {
if err := writer.Write(kvs); err != nil {
return err
panic(err)
}
}
log.Println("DONE streaming. Flushing...")
return writer.Flush()
}

Expand Down
126 changes: 84 additions & 42 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ type StreamWriter struct {
db *DB
done func()
throttle *y.Throttle
head valuePointer
maxVersion uint64
writers map[uint32]*sortedWriter
closer *y.Closer
}

// NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
Expand All @@ -60,6 +60,7 @@ func (db *DB) NewStreamWriter() *StreamWriter {
// concurrent streams being processed.
throttle: y.NewThrottle(16),
writers: make(map[uint32]*sortedWriter),
closer: y.NewCloser(0),
}
}

Expand All @@ -74,9 +75,12 @@ func (sw *StreamWriter) Prepare() error {
}

// Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
// would use to demux the writes.
// would use to demux the writes. Write is not thread safe and it should NOT be called concurrently.
func (sw *StreamWriter) Write(kvs *pb.KVList) error {
var entries []*Entry
if len(kvs.GetKv()) == 0 {
return nil
}
streamReqs := make(map[uint32]*request)
for _, kv := range kvs.Kv {
var meta, userMeta byte
if len(kv.Meta) > 0 {
Expand All @@ -98,50 +102,28 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error {
// If the value can be colocated with the key in LSM tree, we can skip
// writing the value to value log.
e.skipVlog = sw.db.shouldWriteValueToLSM(*e)
entries = append(entries, e)
req := streamReqs[kv.StreamId]
if req == nil {
req = &request{}
streamReqs[kv.StreamId] = req
}
req.Entries = append(req.Entries, e)
}
req := &request{
Entries: entries,
var all []*request
for _, req := range streamReqs {
all = append(all, req)
}
y.AssertTrue(len(kvs.Kv) == len(req.Entries))
if err := sw.db.vlog.write([]*request{req}); err != nil {
if err := sw.db.vlog.write(all); err != nil {
return err
}

for i, kv := range kvs.Kv {
e := req.Entries[i]
vptr := req.Ptrs[i]
if !vptr.IsZero() {
y.AssertTrue(sw.head.Less(vptr))
sw.head = vptr
}

writer, ok := sw.writers[kv.StreamId]
for streamId, req := range streamReqs {
writer, ok := sw.writers[streamId]
if !ok {
writer = sw.newWriter(kv.StreamId)
sw.writers[kv.StreamId] = writer
}

var vs y.ValueStruct
if e.skipVlog {
vs = y.ValueStruct{
Value: e.Value,
Meta: e.meta,
UserMeta: e.UserMeta,
ExpiresAt: e.ExpiresAt,
}
} else {
vbuf := make([]byte, vptrSize)
vs = y.ValueStruct{
Value: vptr.Encode(vbuf),
Meta: e.meta | bitValuePointer,
UserMeta: e.UserMeta,
ExpiresAt: e.ExpiresAt,
}
}
if err := writer.Add(e.Key, vs); err != nil {
return err
writer = sw.newWriter(streamId)
sw.writers[streamId] = writer
}
writer.reqCh <- req
}
return nil
}
Expand All @@ -150,15 +132,21 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error {
// updates Oracle with maxVersion found in all entries (if DB is not managed).
func (sw *StreamWriter) Flush() error {
defer sw.done()

sw.closer.SignalAndWait()
var maxHead valuePointer
for _, writer := range sw.writers {
if err := writer.Done(); err != nil {
return err
}
if maxHead.Less(writer.head) {
maxHead = writer.head
}
}

// Encode and write the value log head into a new table.
data := make([]byte, vptrSize)
sw.head.Encode(data)
maxHead.Encode(data)
headWriter := sw.newWriter(headStreamId)
if err := headWriter.Add(
y.KeyWithTs(head, sw.maxVersion),
Expand Down Expand Up @@ -198,20 +186,74 @@ type sortedWriter struct {
builder *table.Builder
lastKey []byte
streamId uint32
reqCh chan *request
head valuePointer
}

func (sw *StreamWriter) newWriter(streamId uint32) *sortedWriter {
return &sortedWriter{
w := &sortedWriter{
db: sw.db,
streamId: streamId,
throttle: sw.throttle,
builder: table.NewTableBuilder(),
reqCh: make(chan *request, 3),
}
sw.closer.AddRunning(1)
go w.handleRequests(sw.closer)
return w
}

// ErrUnsortedKey is returned when any out of order key arrives at sortedWriter during call to Add.
var ErrUnsortedKey = errors.New("Keys not in sorted order")

func (w *sortedWriter) handleRequests(closer *y.Closer) {
defer closer.Done()

process := func(req *request) {
for i, e := range req.Entries {
vptr := req.Ptrs[i]
if !vptr.IsZero() {
y.AssertTrue(w.head.Less(vptr))
w.head = vptr
}

var vs y.ValueStruct
if e.skipVlog {
vs = y.ValueStruct{
Value: e.Value,
Meta: e.meta,
UserMeta: e.UserMeta,
ExpiresAt: e.ExpiresAt,
}
} else {
vbuf := make([]byte, vptrSize)
vs = y.ValueStruct{
Value: vptr.Encode(vbuf),
Meta: e.meta | bitValuePointer,
UserMeta: e.UserMeta,
ExpiresAt: e.ExpiresAt,
}
}
if err := w.Add(e.Key, vs); err != nil {
panic(err)
}
}
}

for {
select {
case req := <-w.reqCh:
process(req)
case <-closer.HasBeenClosed():
close(w.reqCh)
for req := range w.reqCh {
process(req)
}
return
}
}
}

// Add adds key and vs to sortedWriter.
func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error {
if bytes.Compare(key, w.lastKey) <= 0 {
Expand Down

0 comments on commit 1725096

Please sign in to comment.